-
Notifications
You must be signed in to change notification settings - Fork 251
Expand file tree
/
Copy pathlegacy_executor.cpp
More file actions
125 lines (112 loc) · 5.13 KB
/
legacy_executor.cpp
File metadata and controls
125 lines (112 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//*****************************************************************************
// Copyright 2025 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#include "legacy_executor.hpp"
#include "servable.hpp"
#include <vector>
namespace ovms {
VisualLanguageModelLegacyExecutor::VisualLanguageModelLegacyExecutor(std::shared_ptr<ov::genai::VLMPipeline> pipe) {
this->pipe = std::move(pipe);
}
bool VisualLanguageModelLegacyExecutor::hasRequests() {
return (requests.size() > 0);
}
size_t VisualLanguageModelLegacyExecutor::requestsQueueSize() {
return requests.size();
}
void VisualLanguageModelLegacyExecutor::processRequest() {
OVMS_PROFILE_FUNCTION();
auto& requestExecutionContext = requests.front();
if (requestExecutionContext->clientDisconnected) {
requestExecutionContext->success = false;
SPDLOG_LOGGER_DEBUG(llm_executor_logger, "Client disconnected, skipping request processing.");
} else {
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation started");
try {
/*
absl::Status GenAiServable::applyLoraAdapter(std::shared_ptr<GenAiServableExecutionContext>& executionContext) {
const auto& request = executionContext->apiHandler->getRequest();
if (request.loraAdapter.has_value()) {
auto props = getProperties();
auto it = props->adaptersByName.find(request.loraAdapter.value());
if (it == props->adaptersByName.end()) {
SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "Unknown LoRA adapter requested: {}", request.loraAdapter.value());
return absl::InvalidArgumentError("Unknown LoRA adapter: " + request.loraAdapter.value());
}
float alpha = props->adapterConfig.get_alpha(it->second);
executionContext->generationConfigBuilder->getConfig().adapters =
ov::genai::AdapterConfig(it->second, alpha);
}
return absl::OkStatus();
}
*/
requestExecutionContext->results = pipe->generate(
requestExecutionContext->inputText,
requestExecutionContext->inputImages,
requestExecutionContext->generationConfigBuilder->getConfig(),
requestExecutionContext->textStreamer);
} catch (std::exception& e) {
requestExecutionContext->success = false;
SPDLOG_LOGGER_ERROR(llm_executor_logger, "VLM pipeline generation failed: {}.", e.what());
}
SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation ended");
}
requestExecutionContext->readySignal.set_value();
requestExecutionContext->executionInProgress.notify_one();
std::unique_lock<std::mutex> lock(queueMutex);
requests.pop();
}
void VisualLanguageModelLegacyExecutor::waitForRequests(std::atomic<bool>* receivedEndSignal) {
std::unique_lock<std::mutex> lock(queueMutex);
cv.wait(lock, [this, receivedEndSignal] { return (requests.size() > 0 || *receivedEndSignal); });
}
void VisualLanguageModelLegacyExecutor::addRequest(std::shared_ptr<VisualLanguageModelLegacyServableExecutionContext> request) {
std::lock_guard<std::mutex> guard(queueMutex);
requests.push(request);
cv.notify_one();
}
void VisualLanguageModelLegacyExecutor::notify() {
std::unique_lock<std::mutex> lock(queueMutex);
cv.notify_one();
}
void VisualLanguageModelLegacyExecutorWrapper::run(VisualLanguageModelLegacyExecutor* executor, std::atomic<bool>* receivedEndSignal) {
// TODO add metrics
while (!(*receivedEndSignal)) {
try {
SPDLOG_LOGGER_INFO(llm_executor_logger, "All requests: {};", executor->requestsQueueSize());
if (executor->hasRequests()) {
executor->processRequest();
} else {
executor->waitForRequests(receivedEndSignal);
}
} catch (std::exception& e) {
SPDLOG_LOGGER_ERROR(llm_executor_logger, "Error occurred in LLM executor: {}.", e.what());
exit(1);
}
}
}
VisualLanguageModelLegacyExecutorWrapper::VisualLanguageModelLegacyExecutorWrapper(std::shared_ptr<ov::genai::VLMPipeline> pipe) :
executor(std::move(pipe)) {
executorThread = std::thread(VisualLanguageModelLegacyExecutorWrapper::run, &executor, &finishExecutorThread);
}
VisualLanguageModelLegacyExecutorWrapper::~VisualLanguageModelLegacyExecutorWrapper() {
finishExecutorThread = true;
executor.notify();
executorThread.join();
}
void VisualLanguageModelLegacyExecutorWrapper::addRequest(std::shared_ptr<VisualLanguageModelLegacyServableExecutionContext> request) {
executor.addRequest(request);
}
} // namespace ovms