Skip to content
Draft
24 changes: 21 additions & 3 deletions src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ ovms_cc_library(
"kfs_backend_impl",
"tfs_backend_impl",
"anonymous_input_name",
"libovms_servable_name_checker",
"libovms_metric_provider",
] + select({
"//:not_disable_cloud": [
"libovmsazurefilesystem",
Expand Down Expand Up @@ -1188,7 +1190,8 @@ ovms_cc_library(
name = "libovmsstatus",
hdrs = ["status.hpp",],
srcs = ["status.cpp",],
deps = ["libovmslogging"],
deps = ["libovmslogging",
"@fmtlib",],
visibility = ["//visibility:public"],
)

Expand All @@ -1203,6 +1206,20 @@ ovms_cc_library(
visibility = ["//visibility:public",],
)

ovms_cc_library(
name = "libovms_servable_name_checker",
hdrs = ["servable_name_checker.hpp",],
deps = [],
visibility = ["//visibility:public"],
)

ovms_cc_library(
name = "libovms_metric_provider",
hdrs = ["metric_provider.hpp",],
deps = [],
visibility = ["//visibility:public"],
)

ovms_cc_library( # make ovms_lib dependent, use share doptions
name = "libovmsstring_utils",
hdrs = ["stringutils.hpp",],
Expand Down Expand Up @@ -1718,6 +1735,7 @@ ovms_cc_library(
hdrs = ["modelversionstatus.hpp",],
srcs = ["modelversionstatus.cpp",],
deps = [
"@fmtlib",
"libovmslogging",
"libovmsmodelversion",
],
Expand Down Expand Up @@ -1772,12 +1790,12 @@ ovms_cc_library(
name = "nodeinfo",
hdrs = ["dags/nodeinfo.hpp",],
deps = [
"@fmtlib",
"libovms_threadsafequeue",
"libovmsmodelversion",
"libovms_tensorinfo",
"node_library",
"libovms_dags_aliases",
"libovmslogging",
],
visibility = ["//visibility:public"],
)
Expand All @@ -1797,7 +1815,7 @@ ovms_cc_library(
deps = [
"libovmsstatus",
"@com_github_grpc_grpc//:grpc++",
"libovmslogging",
"@fmtlib",
],
visibility = ["//visibility:public"],
)
Expand Down
1 change: 0 additions & 1 deletion src/azurestorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <string>
#include <vector>

#include "logging.hpp"
#include "status.hpp"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wall"
Expand Down
4 changes: 3 additions & 1 deletion src/capi_frontend/capi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
#pragma warning(pop)

#include "../dags/pipeline.hpp"
#include "../dags/pipeline_factory.hpp"
#include "../dags/pipelinedefinition.hpp"
#include "../dags/pipelinedefinitionstatus.hpp"
#include "../dags/pipelinedefinitionunloadguard.hpp"
#include "../execution_context.hpp"
#include "../version.hpp"
#if (MEDIAPIPE_DISABLE == 0)
#include "../mediapipe_internal/mediapipefactory.hpp"
#include "../mediapipe_internal/mediapipegraphdefinition.hpp"
#endif
#include "../model_service.hpp"
Expand Down Expand Up @@ -120,7 +122,7 @@ static Status getPipeline(ovms::Server& server, const InferenceRequest* request,
if (!status.ok()) {
return status;
}
return modelManager->createPipeline(pipelinePtr, request->getServableName(), request, response);
return modelManager->getPipelineFactory().create(pipelinePtr, request->getServableName(), request, response, *modelManager);
}

static Status getPipelineDefinition(Server& server, const std::string& servableName, PipelineDefinition** pipelineDefinition, std::unique_ptr<PipelineDefinitionUnloadGuard>& unloadGuard) {
Expand Down
2 changes: 1 addition & 1 deletion src/capi_frontend/capi_request_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#include <vector>

#include "../ovms.h" // NOLINT
#include "src/logging.hpp"
#include "../precision.hpp"
#include "inferencerequest.hpp"
#include "../shape.hpp"
#include "../logging.hpp"
#include "../status.hpp" // TODO move impl @atobisze
#include "../extractchoice.hpp"
#include "../requesttensorextractor.hpp"
Expand Down
2 changes: 2 additions & 0 deletions src/color_format_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <algorithm>
#include <vector>

#include "logging.hpp"

namespace ovms {

const char ColorFormatConfiguration::COLOR_FORMAT_DELIMITER = ':';
Expand Down
1 change: 0 additions & 1 deletion src/dags/entry_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include <openvino/openvino.hpp>

#include "../logging.hpp"
#include "../ovms.h" // NOLINT
#include "../regularovtensorfactory.hpp"
#include "../tensorinfo.hpp"
Expand Down
1 change: 0 additions & 1 deletion src/dags/gatherexitnodeinputhandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "../capi_frontend/capi_utils.hpp"
#include "../capi_frontend/capi_dag_utils.hpp"
#include "../kfs_frontend/kfs_utils.hpp"
#include "../logging.hpp"
#include "../profiler.hpp"
#include "../status.hpp"
#include "../tfs_frontend/tfs_utils.hpp"
Expand Down
3 changes: 2 additions & 1 deletion src/dags/nodeinfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
#include <utility>
#include <vector>

#include <fmt/format.h>

#include "../modelversion.hpp"
#include "../tensorinfo.hpp"
#include "aliases.hpp"
#include "node_library.hpp"
#include "../logging.hpp"

namespace ovms {

Expand Down
11 changes: 3 additions & 8 deletions src/dags/pipelinedefinition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <thread>

#include "../logging.hpp"
#include "../model.hpp"
#include "../model_metric_reporter.hpp"
#include "../modelinstance.hpp"
#include "../modelinstanceunloadguard.hpp"
Expand Down Expand Up @@ -68,16 +69,10 @@ PipelineDefinition::PipelineDefinition(const std::string& pipelineName,
Status PipelineDefinition::validate(ModelManager& manager) {
SPDLOG_LOGGER_DEBUG(modelmanager_logger, "Started validation of pipeline: {}", getName());
ValidationResultNotifier notifier(status, loadedNotify);
if (manager.modelExists(this->pipelineName)) {
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Pipeline name: {} is already occupied by model.", pipelineName);
if (manager.servableExists(this->pipelineName, ServableType::Model | ServableType::Mediapipe)) {
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Pipeline name: {} is already occupied by model or mediapipe graph.", pipelineName);
return StatusCode::PIPELINE_NAME_OCCUPIED;
}
#if (MEDIAPIPE_DISABLE == 0)
if (manager.getMediapipeFactory().definitionExists(this->pipelineName)) {
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Pipeline name: {} is already occupied by mediapipe graph.", pipelineName);
return StatusCode::PIPELINE_NAME_OCCUPIED;
}
#endif
Status validationResult = initializeNodeResources(manager);
if (!validationResult.ok()) {
return validationResult;
Expand Down
2 changes: 2 additions & 0 deletions src/get_model_metadata_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

#include <google/protobuf/util/json_util.h>

#include "dags/pipeline_factory.hpp"
#include "dags/pipelinedefinition.hpp"
#include "dags/pipelinedefinitionstatus.hpp"
#include "dags/pipelinedefinitionunloadguard.hpp"
#include "execution_context.hpp"
#include "model.hpp"
#include "modelinstance.hpp"
#include "modelinstanceunloadguard.hpp"
#include "modelmanager.hpp"
Expand Down
4 changes: 2 additions & 2 deletions src/grpc_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
//*****************************************************************************
#pragma once
#include <string>
#include <grpcpp/server_context.h>

#include "logging.hpp"
#include <fmt/format.h>
#include <grpcpp/server_context.h>

namespace ovms {
class Status;
Expand Down
6 changes: 4 additions & 2 deletions src/http_rest_api_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#include "config.hpp"
#include "dags/pipeline.hpp"
#include "dags/pipeline_factory.hpp"
#include "dags/pipelinedefinition.hpp"
#include "dags/pipelinedefinitionunloadguard.hpp"
#include "execution_context.hpp"
Expand Down Expand Up @@ -68,6 +69,7 @@
#include "http_payload.hpp"
#include "http_frontend/http_client_connection.hpp"
#include "http_frontend/http_graph_executor_impl.hpp"
#include "mediapipe_internal/mediapipefactory.hpp"
#include "mediapipe_internal/mediapipegraphexecutor.hpp"
#endif

Expand Down Expand Up @@ -1153,7 +1155,7 @@ Status HttpRestApiHandler::processPredictRequest(
if (this->modelManager.modelExists(modelName)) {
SPDLOG_DEBUG("Found model with name: {}. Searching for requested version...", modelName);
status = processSingleModelRequest(modelName, modelVersion, request, requestOrder, responseProto, reporterOut);
} else if (this->modelManager.pipelineDefinitionExists(modelName)) {
} else if (this->modelManager.servableExists(modelName, ServableType::Pipeline)) {
SPDLOG_DEBUG("Found pipeline with name: {}", modelName);
status = processPipelineRequest(modelName, request, requestOrder, responseProto, reporterOut);
} else {
Expand Down Expand Up @@ -1288,7 +1290,7 @@ Status HttpRestApiHandler::processPipelineRequest(const std::string& modelName,

tensorflow::serving::PredictRequest& requestProto = requestParser.getProto();
requestProto.mutable_model_spec()->set_name(modelName);
status = this->modelManager.createPipeline(pipelinePtr, modelName, &requestProto, &responseProto);
status = this->modelManager.getPipelineFactory().create(pipelinePtr, modelName, &requestProto, &responseProto, this->modelManager);
if (!status.ok()) {
INCREMENT_IF_ENABLED(reporterOut->getInferRequestMetric(executionContext, false));
return status;
Expand Down
2 changes: 2 additions & 0 deletions src/image_gen/imagegen_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <utility>
#include <vector>

#include <fmt/ranges.h>

#include "absl/strings/str_replace.h"
#include "absl/strings/ascii.h"

Expand Down
1 change: 0 additions & 1 deletion src/inference_request_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <sstream>
#include <string>

#include "logging.hpp"
#include "shape.hpp"
#include "anonymous_input_name.hpp"
#include "status.hpp"
Expand Down
5 changes: 4 additions & 1 deletion src/kfs_frontend/kfs_grpc_inference_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "kfs_utils.hpp"
#include "kfs_request_utils.hpp"
#include "../dags/pipeline.hpp"
#include "../dags/pipeline_factory.hpp"
#include "../dags/pipelinedefinition.hpp"
#include "../dags/pipelinedefinitionstatus.hpp"
#include "../dags/pipelinedefinitionunloadguard.hpp"
Expand All @@ -36,11 +37,13 @@
// kfs_graph_executor_impl needs to be included before mediapipegraphexecutor
// because it contains functions required by graph execution template
#include "kfs_graph_executor_impl.hpp"
#include "../mediapipe_internal/mediapipefactory.hpp"
#include "../mediapipe_internal/mediapipegraphdefinition.hpp"
#include "../mediapipe_internal/mediapipegraphexecutor.hpp"
// clang-format on
#endif
#include "../metric.hpp"
#include "../model.hpp"
#include "../modelinstance.hpp"
#include "../deserialization_main.hpp"
#include "../inference_executor.hpp"
Expand Down Expand Up @@ -85,7 +88,7 @@ Status KFSInferenceServiceImpl::getPipeline(const KFSRequest* request,
KFSResponse* response,
std::unique_ptr<ovms::Pipeline>& pipelinePtr) {
OVMS_PROFILE_FUNCTION();
return this->modelManager.createPipeline(pipelinePtr, request->model_name(), request, response);
return this->modelManager.getPipelineFactory().create(pipelinePtr, request->model_name(), request, response, this->modelManager);
}

const std::string PLATFORM = "OpenVINO";
Expand Down
1 change: 0 additions & 1 deletion src/kfs_frontend/validation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "kfs_utils.hpp"
#include "../precision.hpp"
#include "../predict_request_validation_utils.hpp"
#include "../logging.hpp"
#include "../profiler.hpp"
#include "../tensorinfo.hpp"
#include "../status.hpp"
Expand Down
2 changes: 2 additions & 0 deletions src/llm/apis/openai_completions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <set>
#include <string.h>

#include <fmt/ranges.h>

#include "openai_json_response.hpp"

#include "../../logging.hpp"
Expand Down
3 changes: 3 additions & 0 deletions src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
//*****************************************************************************
#include "logging.hpp"

#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_sinks.h>

#if (MEDIAPIPE_DISABLE == 0)
#include <glog/logging.h>
#endif
Expand Down
3 changes: 0 additions & 3 deletions src/logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
#include <memory>
#include <string>

#include <fmt/ranges.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <spdlog/spdlog.h>

namespace ovms {
Expand Down
25 changes: 13 additions & 12 deletions src/mediapipe_internal/mediapipefactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
#pragma warning(pop)
#include "../kfs_frontend/kfs_grpc_inference_service.hpp"
#include "../logging.hpp"
#include "../modelmanager.hpp"
#include "../metric_provider.hpp"
#include "../servable_name_checker.hpp"
#include "../status.hpp"
#include "../stringutils.hpp"
#pragma warning(push)
Expand Down Expand Up @@ -62,13 +63,14 @@ MediapipeFactory::MediapipeFactory(PythonBackend* pythonBackend) {

Status MediapipeFactory::createDefinition(const std::string& pipelineName,
const MediapipeGraphConfig& config,
ModelManager& manager) {
MetricProvider& metrics,
const ServableNameChecker& checker) {
if (definitionExists(pipelineName)) {
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Mediapipe graph definition: {} is already created", pipelineName);
return StatusCode::PIPELINE_DEFINITION_ALREADY_EXIST;
}
std::shared_ptr<MediapipeGraphDefinition> graphDefinition = std::make_shared<MediapipeGraphDefinition>(pipelineName, config, manager.getMetricRegistry(), &manager.getMetricConfig(), pythonBackend);
auto stat = graphDefinition->validate(manager);
std::shared_ptr<MediapipeGraphDefinition> graphDefinition = std::make_shared<MediapipeGraphDefinition>(pipelineName, config, metrics.getMetricRegistry(), &metrics.getMetricConfig(), pythonBackend);
auto stat = graphDefinition->validate(checker);
if (stat.getCode() == StatusCode::MEDIAPIPE_GRAPH_NAME_OCCUPIED) {
return stat;
}
Expand All @@ -94,19 +96,18 @@ MediapipeGraphDefinition* MediapipeFactory::findDefinitionByName(const std::stri

Status MediapipeFactory::reloadDefinition(const std::string& name,
const MediapipeGraphConfig& config,
ModelManager& manager) {
const ServableNameChecker& checker) {
auto mgd = findDefinitionByName(name);
if (mgd == nullptr) {
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Requested to reload mediapipe graph definition but it does not exist: {}", name);
return StatusCode::INTERNAL_ERROR;
}
SPDLOG_LOGGER_INFO(modelmanager_logger, "Reloading mediapipe graph: {}", name);
return mgd->reload(manager, config);
return mgd->reload(checker, config);
}

Status MediapipeFactory::create(std::unique_ptr<MediapipeGraphExecutor>& pipeline,
const std::string& name,
ModelManager& manager) const {
const std::string& name) const {
std::shared_lock lock(definitionsMtx);
auto it = definitions.find(name);
if (it == definitions.end()) {
Expand All @@ -117,17 +118,17 @@ Status MediapipeFactory::create(std::unique_ptr<MediapipeGraphExecutor>& pipelin
return definition.create(pipeline);
}

void MediapipeFactory::retireOtherThan(std::set<std::string>&& graphsInConfigFile, ModelManager& manager) {
void MediapipeFactory::retireOtherThan(std::set<std::string>&& graphsInConfigFile) {
std::for_each(definitions.begin(),
definitions.end(),
[&graphsInConfigFile, &manager](auto& nameDefinitionPair) {
[&graphsInConfigFile](auto& nameDefinitionPair) {
if (graphsInConfigFile.find(nameDefinitionPair.second->getName()) == graphsInConfigFile.end() && nameDefinitionPair.second->getStateCode() != PipelineDefinitionStateCode::RETIRED) {
nameDefinitionPair.second->retire(manager);
nameDefinitionPair.second->retire();
}
});
}

Status MediapipeFactory::revalidatePipelines(ModelManager&) {
Status MediapipeFactory::revalidatePipelines() {
SPDLOG_LOGGER_WARN(modelmanager_logger, "revalidation of mediapipe graphs not implemented yet");
return StatusCode::OK;
}
Expand Down
Loading