Skip to content

Commit d6bf004

Browse files
committed
MINIFICPP-2669 - Remove ThreadManagementService, review changes
1 parent b32ed07 commit d6bf004

16 files changed

Lines changed: 24 additions & 267 deletions

File tree

core-framework/include/core/controller/ControllerServiceBase.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@
3333

3434
namespace org::apache::nifi::minifi::core::controller {
3535

36-
/**
37-
* Controller Service base class that contains some pure virtual methods.
38-
*
39-
* Design: OnEnable is executed when the controller service is being enabled.
40-
* Note that keeping state here must be protected in this function.
41-
*/
36+
// A base class that helps with controller service development, contains common functionalities.
4237
class ControllerServiceBase : public ControllerServiceApi {
4338
public:
4439
explicit ControllerServiceBase(ControllerServiceMetadata metadata)
@@ -107,7 +102,9 @@ class ControllerServiceBase : public ControllerServiceApi {
107102
utils::Identifier uuid_;
108103
std::vector<std::shared_ptr<controller::ControllerServiceInterface> > linked_services_;
109104
std::shared_ptr<Configure> configuration_;
105+
// valid during initialize, sink for supported properties
110106
ControllerServiceDescriptor* descriptor_{nullptr};
107+
// valid during onEnable, provides property access
111108
ControllerServiceContext* context_{nullptr};
112109

113110
std::shared_ptr<core::logging::Logger> logger_;

core-framework/include/utils/ThreadPool.h

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
#include "MinifiConcurrentQueue.h"
3838
#include "Monitors.h"
3939
#include "core/expect.h"
40-
#include "minifi-cpp/controllers/ThreadManagementService.h"
4140
#include "minifi-cpp/core/logging/Logger.h"
4241

4342
namespace org::apache::nifi::minifi::utils {
@@ -136,10 +135,7 @@ class WorkerThread {
136135
*/
137136
class ThreadPool {
138137
public:
139-
using ControllerServiceProvider = std::function<std::shared_ptr<core::controller::ControllerServiceInterface>(std::string_view)>;
140-
141-
ThreadPool(int max_worker_threads = 2,
142-
ControllerServiceProvider controller_service_provider = nullptr, std::string name = "NamelessPool");
138+
ThreadPool(int max_worker_threads = 2, std::string name = "NamelessPool");
143139

144140
ThreadPool(const ThreadPool &other) = delete;
145141
ThreadPool& operator=(const ThreadPool &other) = delete;
@@ -232,20 +228,6 @@ class ThreadPool {
232228
start();
233229
}
234230

235-
void setControllerServiceProvider(ControllerServiceProvider controller_service_provider) {
236-
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
237-
bool was_running = running_;
238-
if (was_running) {
239-
shutdown();
240-
}
241-
controller_service_provider_ = controller_service_provider;
242-
if (was_running)
243-
start();
244-
}
245-
246-
private:
247-
std::shared_ptr<controllers::ThreadManagementService> createThreadManager() const;
248-
249231
protected:
250232
std::thread createThread(std::function<void()> &&functor) {
251233
return std::thread([ functor ]() mutable {
@@ -273,8 +255,6 @@ class ThreadPool {
273255
std::thread manager_thread_;
274256
std::thread delayed_scheduler_thread_;
275257
std::atomic<bool> running_;
276-
ControllerServiceProvider controller_service_provider_;
277-
std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
278258
ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
279259
ConditionConcurrentQueue<Worker> worker_queue_;
280260
std::priority_queue<Worker, std::vector<Worker>, DelayedTaskComparator> delayed_worker_queue_;

core-framework/src/utils/ThreadPool.cpp

Lines changed: 4 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ using namespace std::literals::chrono_literals;
2222

2323
namespace org::apache::nifi::minifi::utils {
2424

25-
ThreadPool::ThreadPool(int max_worker_threads, ControllerServiceProvider controller_service_provider, std::string name)
25+
ThreadPool::ThreadPool(int max_worker_threads, std::string name)
2626
: thread_reduction_count_(0),
2727
max_worker_threads_(max_worker_threads),
2828
current_workers_(0),
2929
running_(false),
30-
controller_service_provider_(std::move(controller_service_provider)),
3130
name_(std::move(name)),
3231
logger_(core::logging::LoggerFactory<ThreadPool>::getLogger()) {
3332
}
@@ -141,71 +140,15 @@ void ThreadPool::manageWorkers() {
141140
}
142141
}
143142

144-
if (nullptr != thread_manager_) {
145-
while (running_) {
146-
auto wait_period = 500ms;
147-
{
148-
std::unique_lock<std::recursive_mutex> manager_lock(manager_mutex_, std::try_to_lock);
149-
if (!manager_lock.owns_lock()) {
150-
// Threadpool is being stopped/started or config is being changed, better wait a bit
151-
std::this_thread::sleep_for(10ms);
152-
continue;
153-
}
154-
if (thread_manager_->isAboveMax(current_workers_)) {
155-
auto max = 1; // thread_manager_->getMaxConcurrentTasks();
156-
auto differential = current_workers_ - max;
157-
thread_reduction_count_ += differential;
158-
} else if (thread_manager_->shouldReduce()) {
159-
if (current_workers_ > 1)
160-
thread_reduction_count_++;
161-
thread_manager_->reduce();
162-
} else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) { // increase slowly
163-
std::unique_lock<std::mutex> worker_queue_lock(worker_queue_mutex_);
164-
auto worker_thread = std::make_shared<WorkerThread>();
165-
worker_thread->thread_ = createThread([this, worker_thread] { run_tasks(worker_thread); });
166-
thread_queue_.push_back(worker_thread);
167-
current_workers_++;
168-
}
169-
std::shared_ptr<WorkerThread> thread_ref;
170-
while (deceased_thread_queue_.tryDequeue(thread_ref)) {
171-
std::unique_lock<std::mutex> worker_queue_lock(worker_queue_mutex_);
172-
if (thread_ref->thread_.joinable())
173-
thread_ref->thread_.join();
174-
thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
175-
}
176-
}
177-
std::this_thread::sleep_for(wait_period);
178-
}
179-
} else {
180-
for (auto &thread : thread_queue_) {
181-
if (thread->thread_.joinable())
182-
thread->thread_.join();
183-
}
184-
}
185-
}
186-
187-
std::shared_ptr<controllers::ThreadManagementService> ThreadPool::createThreadManager() const {
188-
if (!controller_service_provider_) {
189-
return nullptr;
190-
}
191-
auto service = controller_service_provider_("ThreadPoolManager");
192-
if (!service) {
193-
logger_->log_info("Could not find a ThreadPoolManager service");
194-
return nullptr;
143+
for (auto &thread : thread_queue_) {
144+
if (thread->thread_.joinable())
145+
thread->thread_.join();
195146
}
196-
auto thread_manager_service = std::dynamic_pointer_cast<controllers::ThreadManagementService>(service);
197-
if (!thread_manager_service) {
198-
logger_->log_error("Found ThreadPoolManager, but it is not a ThreadManagementService");
199-
return nullptr;
200-
}
201-
return thread_manager_service;
202147
}
203148

204149
void ThreadPool::start() {
205150
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
206151
if (!running_) {
207-
thread_manager_ = createThreadManager();
208-
209152
running_ = true;
210153
worker_queue_.start();
211154
manager_thread_ = std::thread(&ThreadPool::manageWorkers, this);

libminifi/include/controllers/ThreadManagementService.h

Lines changed: 0 additions & 84 deletions
This file was deleted.

libminifi/include/core/controller/ControllerService.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,7 @@ enum ControllerServiceState {
3838
ENABLED
3939
};
4040

41-
/**
42-
* Controller Service base class that contains some pure virtual methods.
43-
*
44-
* Design: OnEnable is executed when the controller service is being enabled.
45-
* Note that keeping state here must be protected in this function.
46-
*/
41+
// Wrapper class to manage common functionalities of controller service implementations (properties, name, id, etc)
4742
class ControllerService : public ConfigurableComponentImpl, public CoreComponentImpl {
4843
class ControllerServiceDescriptorImpl : public ControllerServiceDescriptor {
4944
public:
@@ -128,7 +123,7 @@ class ControllerService : public ConfigurableComponentImpl, public CoreComponent
128123

129124
template<typename T = ControllerServiceInterface>
130125
gsl::not_null<T*> getImplementation() {
131-
return gsl::make_not_null(dynamic_cast<T*>(impl_.get()));
126+
return gsl::make_not_null(dynamic_cast<T*>(impl_->getControllerServiceInterface()));
132127
}
133128

134129
bool supportsDynamicProperties() const final {

libminifi/src/FlowController.cpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
5353
: core::controller::ForwardingControllerServiceProvider(core::className<FlowController>()),
5454
running_(false),
5555
initialized_(false),
56-
thread_pool_(5, nullptr, "Flowcontroller threadpool"),
56+
thread_pool_(5, "Flowcontroller threadpool"),
5757
configuration_(std::move(configure)),
5858
provenance_repo_(std::move(provenance_repo)),
5959
flow_file_repo_(std::move(flow_file_repo)),
@@ -275,12 +275,6 @@ void FlowController::load(bool reload) {
275275
if (!thread_pool_.isRunning() || reload) {
276276
thread_pool_.shutdown();
277277
thread_pool_.setMaxConcurrentTasks(configuration_->getInt(Configure::nifi_flow_engine_threads, 5));
278-
thread_pool_.setControllerServiceProvider([this] (std::string_view name) -> std::shared_ptr<core::controller::ControllerServiceInterface> {
279-
if (auto service = this->getControllerService(std::string{name})) {
280-
return {service, service->getImplementation()};
281-
}
282-
return {};
283-
});
284278
thread_pool_.start();
285279
}
286280

libminifi/src/c2/C2Agent.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ C2Agent::C2Agent(std::shared_ptr<Configure> configuration,
6262
configuration_(std::move(configuration)),
6363
node_reporter_(std::move(node_reporter)),
6464
filesystem_(std::move(filesystem)),
65-
thread_pool_(2, nullptr, "C2 threadpool"),
65+
thread_pool_(2, "C2 threadpool"),
6666
request_restart_(std::move(request_restart)),
6767
last_run_(std::chrono::steady_clock::now()),
6868
asset_manager_(asset_manager) {

libminifi/test/unit/JsonFlowSerializerTests.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ TEST_CASE("The encrypted flow configuration can be decrypted with the correct ke
716716
REQUIRE(controller_service_node_before);
717717
const auto* const controller_service_node_after = process_group_after->findControllerService(controller_service_id);
718718
REQUIRE(controller_service_node_after);
719-
const auto controller_service_after = controller_service_node_before->getControllerServiceImplementation();
719+
const auto controller_service_after = controller_service_node_after->getControllerServiceImplementation();
720720
REQUIRE(controller_service_after);
721721
CHECK(controller_service_before->getProperty("CA Certificate") == controller_service_after->getProperty("CA Certificate"));
722722
CHECK(controller_service_before->getProperty("Passphrase") == controller_service_after->getProperty("Passphrase"));

libminifi/test/unit/YamlFlowSerializerTests.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ TEST_CASE("The encrypted flow configuration can be decrypted with the correct ke
331331
REQUIRE(controller_service_node_before);
332332
const auto* const controller_service_node_after = process_group_after->findControllerService(controller_service_id);
333333
REQUIRE(controller_service_node_after);
334-
const auto controller_service_after = controller_service_node_before->getControllerServiceImplementation();
334+
const auto controller_service_after = controller_service_node_after->getControllerServiceImplementation();
335335
REQUIRE(controller_service_after);
336336
CHECK(controller_service_before->getProperty("CA Certificate") == controller_service_after->getProperty("CA Certificate"));
337337
CHECK(controller_service_before->getProperty("Passphrase") == controller_service_after->getProperty("Passphrase"));

minifi-api/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
add_library(minifi-api-common INTERFACE)
22
target_include_directories(minifi-api-common INTERFACE common/include)
33
target_link_libraries(minifi-api-common INTERFACE gsl-lite)
4+
target_compile_definitions(minifi-api-common INTERFACE MINIFI_VERSION_STR="${MINIFI_VERSION_STR}")
45

56
add_library(minifi-api INTERFACE)
67
target_include_directories(minifi-api INTERFACE include)

0 commit comments

Comments
 (0)