Skip to content
Open
4 changes: 1 addition & 3 deletions controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "c2/ControllerSocketProtocol.h"
#include "controllers/SSLContextService.h"
#include "core/ConfigurationFactory.h"
#include "minifi-cpp/core/controller/ControllerService.h"
#include "core/extension/ExtensionManager.h"
#include "properties/Configure.h"
#include "range/v3/algorithm/contains.hpp"
Expand All @@ -42,8 +41,7 @@ std::shared_ptr<minifi::controllers::SSLContextServiceInterface> getSSLContextSe
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> secure_context;
std::string secure_str;
if (configuration->get(minifi::Configure::nifi_remote_input_secure, secure_str) && minifi::utils::string::toBool(secure_str).value_or(false)) {
secure_context = std::make_shared<minifi::controllers::SSLContextService>("ControllerSocketProtocolSSL", configuration);
secure_context->onEnable();
secure_context = minifi::controllers::SSLContextService::createAndEnable("ControllerSocketProtocolSSL", configuration);
}

return secure_context;
Expand Down
3 changes: 1 addition & 2 deletions controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ class ControllerTestFixture {
configuration_->set(minifi::Configure::nifi_security_client_private_key, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "minifi-cpp-flow.key").string());
configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, "abcdefgh");
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "root-ca.pem").string());
ssl_context_service_ = std::make_shared<controllers::SSLContextService>("SSLContextService", configuration_);
ssl_context_service_->onEnable();
ssl_context_service_ = controllers::SSLContextService::createAndEnable("SSLContextService", configuration_);
controller_socket_data_.host = "localhost";
controller_socket_data_.port = 9997;
}
Expand Down
6 changes: 6 additions & 0 deletions core-framework/include/core/Resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "utils/OptionalUtils.h"
#include "utils/Macro.h"
#include "core/ProcessorFactoryImpl.h"
#include "core/controller/ControllerServiceFactoryImpl.h"
#include "core/ObjectFactory.h"
#include "minifi-cpp/agent/agent_version.h"

Expand Down Expand Up @@ -62,6 +63,11 @@ class StaticClassType {
auto factory = std::unique_ptr<ProcessorFactory>(new ProcessorFactoryImpl<Class>(module_name));
getClassLoader().registerClass(construction_name, std::move(factory));
}
} else if constexpr (Type == ResourceType::ControllerService) {
for (const auto& construction_name : construction_names_) {
auto factory = std::unique_ptr<controller::ControllerServiceFactory>(new controller::ControllerServiceFactoryImpl<Class>(module_name));
getClassLoader().registerClass(construction_name, std::move(factory));
}
} else {
for (const auto& construction_name : construction_names_) {
auto factory = std::unique_ptr<ObjectFactory>(new DefaultObjectFactory<Class>(module_name));
Expand Down
121 changes: 0 additions & 121 deletions core-framework/include/core/controller/ControllerService.h

This file was deleted.

113 changes: 113 additions & 0 deletions core-framework/include/core/controller/ControllerServiceBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "minifi-cpp/properties/Configure.h"
#include "core/Core.h"
#include "core/ConfigurableComponentImpl.h"
#include "core/Connectable.h"
#include "minifi-cpp/core/controller/ControllerServiceApi.h"
#include "minifi-cpp/core/controller/ControllerServiceInterface.h"
#include "minifi-cpp/core/ControllerServiceApiDefinition.h"
#include "minifi-cpp/core/controller/ControllerServiceMetadata.h"

namespace org::apache::nifi::minifi::core::controller {

// A base class that helps with controller service development, contains common functionalities.
class ControllerServiceBase : public ControllerServiceApi {
public:
explicit ControllerServiceBase(ControllerServiceMetadata metadata)
: name_(std::move(metadata.name)),
uuid_(metadata.uuid),
logger_(std::move(metadata.logger)) {}

virtual void initialize() {}

void initialize(ControllerServiceDescriptor& descriptor) final {
gsl_Expects(!descriptor_);
descriptor_ = &descriptor;
auto guard = gsl::finally([&] {descriptor_ = nullptr;});
initialize();
}

void setSupportedProperties(std::span<const PropertyReference> properties) {
gsl_Expects(descriptor_);
descriptor_->setSupportedProperties(properties);
}

~ControllerServiceBase() override {}

virtual void onEnable() {}

/**
* Function is called when Controller Services are enabled and being run
*/
void onEnable(ControllerServiceContext& context, const std::shared_ptr<Configure>& configuration, const std::vector<std::shared_ptr<ControllerServiceInterface>>& linked_services) final {
configuration_ = configuration;
linked_services_ = linked_services;
gsl_Expects(!context_);
context_ = &context;
auto guard = gsl::finally([&] {context_ = nullptr;});
onEnable();
}

[[nodiscard]] nonstd::expected<std::string, std::error_code> getProperty(std::string_view name) const {
gsl_Expects(context_);
return context_->getProperty(name);
}

[[nodiscard]] nonstd::expected<std::vector<std::string>, std::error_code> getAllPropertyValues(std::string_view name) const {
gsl_Expects(context_);
return context_->getAllPropertyValues(name);
}

/**
* Function is called when Controller Services are disabled
*/
void notifyStop() override {}

std::string getName() const {
return name_;
}

utils::Identifier getUUID() const {
return uuid_;
}


static constexpr auto ImplementsApis = std::array<ControllerServiceApiDefinition, 0>{};

protected:
std::string name_;
utils::Identifier uuid_;
std::vector<std::shared_ptr<controller::ControllerServiceInterface> > linked_services_;
std::shared_ptr<Configure> configuration_;
// valid during initialize, sink for supported properties
ControllerServiceDescriptor* descriptor_{nullptr};
// valid during onEnable, provides property access
ControllerServiceContext* context_{nullptr};
Comment on lines 106 to 108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment describing the lifetimes (what guarantees that these will remain alive throughout the ControllerService lifetime) would be nice here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment


std::shared_ptr<core::logging::Logger> logger_;
};

} // namespace org::apache::nifi::minifi::core::controller
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <memory>
#include <utility>
#include "core/ClassName.h"
#include "minifi-cpp/core/controller/ControllerServiceFactory.h"

namespace org::apache::nifi::minifi::core::controller {

template<class T>
class ControllerServiceFactoryImpl : public ControllerServiceFactory {
public:
ControllerServiceFactoryImpl()
: class_name_(core::className<T>()) {
}

explicit ControllerServiceFactoryImpl(std::string group_name)
: group_name_(std::move(group_name)),
class_name_(core::className<T>()) {
}

std::string getGroupName() const override {
return group_name_;
}

std::unique_ptr<ControllerServiceApi> create(ControllerServiceMetadata metadata) override {
return std::make_unique<T>(metadata);
}

std::string getClassName() const override {
return std::string{class_name_};
}

protected:
std::string group_name_;
std::string_view class_name_;
};

} // namespace org::apache::nifi::minifi::core::controller
21 changes: 1 addition & 20 deletions core-framework/include/utils/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
#include "MinifiConcurrentQueue.h"
#include "Monitors.h"
#include "core/expect.h"
#include "minifi-cpp/controllers/ThreadManagementService.h"
#include "minifi-cpp/core/controller/ControllerServiceLookup.h"
#include "minifi-cpp/core/logging/Logger.h"

namespace org::apache::nifi::minifi::utils {
Expand Down Expand Up @@ -137,8 +135,7 @@ class WorkerThread {
*/
class ThreadPool {
public:
ThreadPool(int max_worker_threads = 2,
core::controller::ControllerServiceLookup* controller_service_provider = nullptr, std::string name = "NamelessPool");
explicit ThreadPool(int max_worker_threads = 2, std::string name = "NamelessPool");

ThreadPool(const ThreadPool &other) = delete;
ThreadPool& operator=(const ThreadPool &other) = delete;
Expand Down Expand Up @@ -231,20 +228,6 @@ class ThreadPool {
start();
}

void setControllerServiceProvider(core::controller::ControllerServiceLookup* controller_service_provider) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
bool was_running = running_;
if (was_running) {
shutdown();
}
controller_service_provider_ = controller_service_provider;
if (was_running)
start();
}

private:
std::shared_ptr<controllers::ThreadManagementService> createThreadManager() const;

protected:
std::thread createThread(std::function<void()> &&functor) {
return std::thread([ functor ]() mutable {
Expand Down Expand Up @@ -272,8 +255,6 @@ class ThreadPool {
std::thread manager_thread_;
std::thread delayed_scheduler_thread_;
std::atomic<bool> running_;
core::controller::ControllerServiceLookup* controller_service_provider_;
std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
ConditionConcurrentQueue<Worker> worker_queue_;
std::priority_queue<Worker, std::vector<Worker>, DelayedTaskComparator> delayed_worker_queue_;
Expand Down
Loading
Loading