Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 115 additions & 54 deletions src/Backends/ApMonBackend.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
#include "ApMonBackend.h"
#include <iostream>
#include <sstream>
#include <vector>
#include <map>
#include <unistd.h>
#include <limits.h>
#include <cstdlib>
#include "../MonLogger.h"
#include "../Exceptions/MonitoringException.h"

Expand Down Expand Up @@ -59,69 +64,125 @@ void ApMonBackend::addGlobalTag(std::string_view /*name*/, std::string_view valu
mEntity += value;
}

void ApMonBackend::send(const Metric& metric)
std::string ApMonBackend::getNodeName()
{
std::string name = metric.getName();
std::string entity = mEntity;
for (const auto& [key, value] : metric.getTags()) {
entity += ',';
entity += tags::TAG_KEY[key];
entity += '=';
(value > 0) ? entity += tags::GetValue(value) : entity += std::to_string(0 - value);
const char* env_p = std::getenv("ALIEN_PROC_ID");
if (env_p) {
return std::string(env_p);
}
if (mRunNumber != 0) entity += (",run=" + std::to_string(mRunNumber));

int valueSize = metric.getValuesSize();
char **paramNames, **paramValues;
int* valueTypes;
paramNames = (char**)std::malloc(valueSize * sizeof(char*));
paramValues = (char**)std::malloc(valueSize * sizeof(char*));
valueTypes = (int*)std::malloc(valueSize * sizeof(int));
// the scope of values must be the same as sendTimedParameters method
int intValue;
double doubleValue;
std::string stringValue;

auto& values = metric.getValues();

for (int i = 0; i < valueSize; i++) {
paramNames[i] = const_cast<char*>(values[i].first.c_str());
std::visit(overloaded{
[&](int value) {
valueTypes[i] = XDR_INT32;
intValue = value;
paramValues[i] = reinterpret_cast<char*>(&intValue);
},
[&](double value) {
valueTypes[i] = XDR_REAL64;
doubleValue = value;
paramValues[i] = reinterpret_cast<char*>(&doubleValue);
},
[&](const std::string& value) {
valueTypes[i] = XDR_STRING;
stringValue = value;
paramValues[i] = const_cast<char*>(stringValue.c_str());
},
[&](uint64_t value) {
valueTypes[i] = XDR_REAL64;
doubleValue = static_cast<double>(value);
paramValues[i] = reinterpret_cast<char*>(&doubleValue);
},
}, values[i].second);

char hostname[HOST_NAME_MAX];
if (gethostname(hostname, sizeof(hostname)) == 0) {
hostname[sizeof(hostname) - 1] = '\0';
return std::string(hostname);
}

mApMon->sendTimedParameters(const_cast<char*>(name.c_str()), const_cast<char*>(entity.c_str()),
valueSize, paramNames, valueTypes, paramValues, convertTimestamp(metric.getTimestamp()));
MonLogger::Get(Severity::Error) << "Failed to get hostname, using 'unknown'" << MonLogger::End();
return "unknown";
}

void ApMonBackend::sendBatch(const std::vector<reference_wrapper<const Metric>>& metrics)
{
std::string clusterName(mClusterName);
std::string nodeName = getNodeName();

int totalValues = 0;
for (const auto& metric : metrics) {
totalValues += metric.get().getValuesSize();
}
const int totalParams = totalValues * 2;
std::vector<int> intValues;
std::vector<double> doubleValues;
std::vector<std::string> stringValues;
std::vector<char*> paramNames;
std::vector<char*> paramValues;
std::vector<int> valueTypes;

intValues.reserve(totalValues);
doubleValues.reserve(totalValues);
stringValues.reserve(metrics.size() * 3 + totalValues);
paramNames.reserve(totalParams);
paramValues.reserve(totalParams);
valueTypes.reserve(totalParams);

for (const auto& metric : metrics) {
std::string entity = mEntity;
for (const auto& [key, value] : metric.get().getTags()) {
entity += ',';
entity += tags::TAG_KEY[key];
entity += '=';
(value > 0) ? entity += tags::GetValue(value) : entity += std::to_string(0 - value);
}
if (mRunNumber != 0) entity += (",run=" + std::to_string(mRunNumber));

auto& values = metric.get().getValues();
const int valueSize = metric.get().getValuesSize();

const std::string_view metricName = metric.get().getName();
stringValues.emplace_back(metricName);
const char* metriNamePtr = stringValues.back().c_str();
stringValues.emplace_back(std::string(metricName) + "_src");
const char* metriNameSrcPtr = stringValues.back().c_str();
stringValues.push_back(std::move(entity));
const char* entityPtr = stringValues.back().c_str();

for (int i = 0; i < valueSize; ++i) {
paramNames.push_back(const_cast<char*>(metriNamePtr));
std::visit(overloaded{
[&](int value) {
valueTypes.push_back(XDR_INT32);
intValues.push_back(value);
paramValues.push_back(reinterpret_cast<char*>(&intValues.back()));
},
[&](double value) {
valueTypes.push_back(XDR_REAL64);
doubleValues.push_back(value);
paramValues.push_back(reinterpret_cast<char*>(&doubleValues.back()));
},
[&](const std::string& value) {
valueTypes.push_back(XDR_STRING);
stringValues.push_back(value);
paramValues.push_back(const_cast<char*>(stringValues.back().c_str()));
},
[&](uint64_t value) {
valueTypes.push_back(XDR_REAL64);
doubleValues.push_back(static_cast<double>(value));
paramValues.push_back(reinterpret_cast<char*>(&doubleValues.back()));
},
}, values[i].second);

std::free(paramNames);
std::free(paramValues);
std::free(valueTypes);
paramNames.push_back(const_cast<char*>(metriNameSrcPtr));
valueTypes.push_back(XDR_STRING);
paramValues.push_back(const_cast<char*>(entityPtr));
}
}

mApMon->sendTimedParameters(
const_cast<char*>(clusterName.c_str()),
const_cast<char*>(nodeName.c_str()),
totalParams, paramNames.data(), valueTypes.data(), paramValues.data(),
convertTimestamp(metrics[0].get().getTimestamp())
);
}

void ApMonBackend::send(const Metric& metric)
{
sendBatch(std::vector<std::reference_wrapper<const Metric>>{std::cref(metric)});
}

void ApMonBackend::send(std::vector<Metric>&& metrics)
{
for (auto& metric : metrics) {
send(metric);
if (metrics.empty()) {
return;
}

std::map<int, std::vector<std::reference_wrapper<const Metric>>> metricsByTimestamp;
for (const auto& metric : metrics) {
metricsByTimestamp[convertTimestamp(metric.getTimestamp())].push_back(std::cref(metric));
}

for (const auto& [timestamp, metricsGroup] : metricsByTimestamp) {
sendBatch(metricsGroup);
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/Backends/ApMonBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <chrono>
#include <memory>
#include <functional>

namespace o2
{
Expand Down Expand Up @@ -63,13 +64,23 @@ class ApMonBackend final : public Backend
void addGlobalTag(std::string_view name, std::string_view value) override;

private:
/// Sends batch of metrics
/// \param metrics vector of metrics
void sendBatch(const std::vector<std::reference_wrapper<const Metric>>& metrics);

/// Converts timestamp to format supported by ApMonBackend
/// \param timestamp timestamp in std::chrono::time_point format
/// \return timestamp as integer (milliseconds from epoch)
int convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp);

/// Gets node name
/// It looks for environment variable ALIEN_PROC_ID and if it is not set, it uses hostname as node name
/// \return node name as string
std::string getNodeName();

std::unique_ptr<ApMon> mApMon; ///< ApMon object
std::string mEntity; ///< MonALISA entity, created out of global tags
inline static constexpr std::string_view mClusterName = "O2Monitoring_Nodes"; ///< MonALISA cluster name
};

} // namespace backends
Expand Down