Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 14 additions & 84 deletions src/dataprocessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,18 @@
import { dbManager } from './dbmanager.js';
import { signalRegistry } from './signalregistry.js';

/**
* DataProcessor Module
* Handles telemetry data parsing, chronological sorting, and state synchronization.
*/
class DataProcessor {
SCHEMA_REGISTRY = {
JSON: { signal: 's', timestamp: 't', value: 'v' },
CSV: {
signal: 'SensorName',
timestamp: 'Time_ms',
value: 'Reading',
},
CSV: { signal: 'SensorName', timestamp: 'Time_ms', value: 'Reading' },
};

SCHEMA = {
timeKey: 'x',
valueKey: 'y',
};
SCHEMA = { timeKey: 'x', valueKey: 'y' };

constructor() {
this.handleLocalFile = this.handleLocalFile.bind(this);
}

/**
* Initializes anomaly detection templates.
*/
async loadConfiguration(providedTemplates = templates) {
try {
if (!providedTemplates) {
Expand All @@ -43,9 +29,7 @@
console.error('Config Loader:', error);
try {
Config.ANOMALY_TEMPLATES = {};
} catch (e) {
/* ignore */
}
} catch (e) {}

Check warning on line 32 in src/dataprocessor.js

View workflow job for this annotation

GitHub Actions / validate_and_build

'e' is defined but never used

Check warning on line 32 in src/dataprocessor.js

View workflow job for this annotation

GitHub Actions / validate_and_build

'e' is defined but never used
}
}

Expand All @@ -62,7 +46,6 @@
files.forEach(async (file) => {
try {
const fileText = await this.#readFileContent(file);

let rawData;
if (file.name.includes('.csv')) {
const parsedCSV = this.#parseCSV(fileText);
Expand All @@ -72,10 +55,8 @@
rawData = this.#normalizeWideCSV(parsedCSV);
}
} else {
// Pass the raw JSON straight to process; it will detect columnar internally
rawData = JSON.parse(fileText);
}

await this.#process(rawData, file.name);
} catch (err) {
const msg = `Error parsing ${file.name}: ${err.message}`;
Expand All @@ -94,7 +75,6 @@
const decompressedStream = file.stream().pipeThrough(ds);
return await new Response(decompressedStream).text();
}

return new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = (e) => resolve(e.target.result);
Expand All @@ -103,8 +83,6 @@
});
}

// --- Data Transformation & State Sync ---

async process(data, fileName) {
const result = await this.#process(data, fileName);
this.#finalizeBatchLoad();
Expand All @@ -115,7 +93,6 @@
try {
let telemetryData = data;

// Auto-detect and unpack the highly compressed columnar format
if (this.#isColumnarJSON(telemetryData)) {
telemetryData = this.#normalizeColumnarJSON(telemetryData);
}
Expand All @@ -138,11 +115,9 @@
}

const schema = this.#detectSchema(telemetryPoints[0]);

const processedPoints = telemetryPoints.flatMap((item) =>
this.#applyMappingAndCleaning(item, schema)
);

const result = this.#transformRawData(processedPoints, fileName);

result.metadata = fileMetadata;
Expand All @@ -159,14 +134,10 @@
);
result.dbId = existingFile.id;
} else {
const dbId = await dbManager.saveTelemetry(result);
result.dbId = dbId;
result.dbId = await dbManager.saveTelemetry(result);
}

const isAlreadyInSession = AppState.files.some(
(f) => f.dbId === result.dbId
);
if (!isAlreadyInSession) {
if (!AppState.files.some((f) => f.dbId === result.dbId)) {
AppState.files.push(result);
}

Expand All @@ -176,7 +147,6 @@
size: result.size,
metadata: result.metadata,
});

return result;
} catch (error) {
console.error('Error occured during file processing', error);
Expand All @@ -195,43 +165,27 @@

#normalizeColumnarJSON(data) {
const normalized = [];

if (data.metadata) {
normalized.push({ metadata: data.metadata });
}
if (data.metadata) normalized.push({ metadata: data.metadata });

const dictionary = data.signal_dictionary || {};
const series = data.series || {};

// Pre-compute canonical names from the dictionary to avoid lookups in the loop
const mappedDictionary = {};
for (const [id, rawLocalizedName] of Object.entries(dictionary)) {
const nameFromId = signalRegistry.getCanonicalByPid(id);

for (const [id, rawLocalizedName] of Object.entries(dictionary)) {
mappedDictionary[id] =
nameFromId ||
signalRegistry.getCanonicalKey(rawLocalizedName) ||
rawLocalizedName;
signalRegistry.getCanonicalByPid(id) || rawLocalizedName || `PID ${id}`;
}

// Iterate through the series
for (const [signalId, vectors] of Object.entries(series)) {
const signalName = mappedDictionary[signalId] || signalId;

const signalName = mappedDictionary[signalId];
const times = vectors.t || [];
const values = vectors.v || [];

const length = Math.min(times.length, values.length);

for (let i = 0; i < length; i++) {
normalized.push({
s: signalName,
t: times[i],
v: values[i],
});
normalized.push({ s: signalName, t: times[i], v: values[i] });
}
}

return normalized;
}

Expand All @@ -240,7 +194,6 @@
const keys = Object.keys(rows[0]);
const hasTimeColumn = keys.includes('Time');
const firstTimeValue = rows[0]['Time'];

return (
hasTimeColumn &&
typeof firstTimeValue === 'string' &&
Expand All @@ -251,22 +204,19 @@
#normalizeAlfaOBD(rows) {
const normalized = [];
if (!rows || rows.length === 0) return normalized;

const keys = Object.keys(rows[0]);
const timeKey = 'Time';
const signalKeys = keys.filter((k) => k !== timeKey);

rows.forEach((row) => {
const rawTime = row[timeKey];
if (!rawTime) return;

const parts = rawTime.split(':');
if (parts.length !== 3) return;

const hours = parseInt(parts[0], 10);
const minutes = parseInt(parts[1], 10);
const seconds = parseFloat(parts[2]);

if (isNaN(hours) || isNaN(minutes) || isNaN(seconds)) return;

const timestampMs = (hours * 3600 + minutes * 60 + seconds) * 1000;
Expand All @@ -282,7 +232,6 @@
}
});
});

return normalized;
}

Expand All @@ -301,24 +250,17 @@
if (isNaN(timestamp)) return [];

let prefix = '';
if (typeof baseSignal === 'string') {
if (typeof baseSignal === 'string')
prefix = baseSignal.replace(/\n/g, ' ').trim();
}

if (typeof rawValue === 'object' && rawValue !== null) {
const derivedPoints = [];

for (const [key, val] of Object.entries(rawValue)) {
const numVal = Number(val);
if (isNaN(numVal)) continue;

const formattedKey = key.charAt(0).toUpperCase() + key.slice(1);
const finalSignal = prefix
? `${prefix}-${formattedKey}`
: formattedKey;

derivedPoints.push({
signal: finalSignal,
signal: prefix ? `${prefix}-${formattedKey}` : formattedKey,
timestamp: timestamp,
value: numVal,
});
Expand All @@ -345,9 +287,7 @@
#parseCSV(csvText) {
const lines = csvText.split('\n').filter((line) => line.trim());
if (lines.length === 0) return [];

const headers = lines[0].split(',').map((h) => h.trim());

return lines.slice(1).map((line) => {
const values = line.split(',');
return headers.reduce((obj, header, i) => {
Expand All @@ -359,15 +299,12 @@

#normalizeWideCSV(rows) {
if (!rows || rows.length === 0) return rows;

const keys = Object.keys(rows[0]);

if (
keys.includes('SensorName') &&
(keys.includes('Time_ms') || keys.includes('time'))
) {
)
return rows;
}

const timeKey = keys.find((k) => k.toLowerCase().includes('time'));
if (!timeKey) return rows;
Expand All @@ -380,7 +317,6 @@
if (isNaN(timeVal)) return;

const timestampMs = timeKey.includes('(s)') ? timeVal * 1000 : timeVal;

signalKeys.forEach((sigKey) => {
const val = row[sigKey];
if (val !== '' && val !== null && val !== undefined) {
Expand All @@ -392,7 +328,6 @@
}
});
});

return normalized;
}

Expand All @@ -406,12 +341,7 @@

sorted.forEach((p) => {
if (!signals[p.signal]) signals[p.signal] = [];

signals[p.signal].push({
[timeKey]: p.timestamp,
[valueKey]: p.value,
});

signals[p.signal].push({ [timeKey]: p.timestamp, [valueKey]: p.value });
if (p.timestamp < minT) minT = p.timestamp;
if (p.timestamp > maxT) maxT = p.timestamp;
});
Expand Down
Loading
Loading