Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ coverage/lcov.info
config/config.js


.vscode/
120 changes: 0 additions & 120 deletions .vscode/launch.json

This file was deleted.

3 changes: 3 additions & 0 deletions lib/constructed-queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ module.exports = {
end_timestamp, parameter, qualifier, units, post_process, subtract,
por_max_value, station_type, percentile_5, data_type, period)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
ON CONFLICT (station, region, COALESCE(qualifier, ''), start_timestamp, end_timestamp)
WHERE lower(parameter) = 'rainfall'
DO NOTHING
RETURNING telemetry_value_parent_id
`
return Query.fromString(sql, values)
Expand Down
141 changes: 82 additions & 59 deletions lib/models/rloi.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const util = require('../helpers/util')
const directly = require('directly')

// telemetry parameters to import (Water level is if rloi station exists)
const parameters = ['Rainfall']
const parameters = new Set(['Rainfall'])

const processedValueDecimalPlace = 3

Expand Down Expand Up @@ -90,6 +90,74 @@ function setSlsTelemetryParent (key, station, item, setOfValues) {
]
}

function normaliseStationRegion (item) {
// Keep region consistent with station data due to known telemetry source differences.
item.$.telemetryRegion = item.$.region
item.$.region = regions[item.$.region] ? regions[item.$.region] : item.$.region
}

async function getStationOrNull (s3, bucket, item) {
try {
const result = await fetchStation(s3, bucket, `rloi/${item.$.region}/${item.$.stationReference}/station.json`)
const bodyContents = await result.Body.transformToString()
return JSON.parse(bodyContents)
} catch (err) {
// Intentionally quiet: missing station files are expected for many telemetry objects.
return null
}
}

function shouldProcessValues (setOfValues, station) {
return parameters.has(setOfValues.$.parameter) || (station && setOfValues.$.parameter === 'Water Level')
}

async function ensureStation (client, item, station) {
if (station) {
return station
}

const defaultStation = {
RLOI_ID: -1,
Region: item.$.region,
Post_Process: 'n',
Station_Type: 'R'
}

const telemetryStation = setSlsTelemetryStation(item)
await client.query('slsTelemetryStation', telemetryStation)

return defaultStation
}

function logDuplicateParentSkipped (station, setOfValues) {
console.log(`Duplicate telemetry parent skipped: station=${station.RLOI_ID} parameter=${setOfValues.$.parameter} qualifier=${setOfValues.$.qualifier || ''} start=${setOfValues.$.startDate}T${setOfValues.$.startTime} end=${setOfValues.$.endDate}T${setOfValues.$.endTime}`)
}

function setSlsTelemetryValues (res, station, setOfValues) {
return setOfValues.Value.map((_, index) => setSlsTelemetryValueItem(index, res, station, setOfValues))
}

async function processSetOfValues ({ client, s3, bucket, key, item, setOfValues }) {
let station = await getStationOrNull(s3, bucket, item)

if (!shouldProcessValues(setOfValues, station)) {
return
}

station = await ensureStation(client, item, station)

const parent = setSlsTelemetryParent(key, station, item, setOfValues)
const res = await client.query('slsTelemetryValueParent', parent)

if (!res.rows?.[0]) {
logDuplicateParentSkipped(station, setOfValues)
return
}

const values = setSlsTelemetryValues(res, station, setOfValues)
await client.query('slsTelemetryValues', values)
}

module.exports = {
async save (value, bucket, key, client, s3) {
let processed = 0
Expand All @@ -104,64 +172,19 @@ module.exports = {
}

for (const item of value.EATimeSeriesDataExchangeFormat.Station) {
if (item.SetofValues) {
// Update region to match station region as telemetry file region slightly differs,
// so keep consistent with station data
item.$.telemetryRegion = item.$.region
item.$.region = regions[item.$.region] ? regions[item.$.region] : item.$.region

await directly(concurrence, item.SetofValues.map(setOfValues => async () => {
let station

try {
const result = await fetchStation(s3, bucket, `rloi/${item.$.region}/${item.$.stationReference}/station.json`)
const bodyContents = await result.Body.transformToString()
station = JSON.parse(bodyContents)
} catch (err) {
// the console log is commented out so as not to spam the cloudwatch lambda
// logging, as the s3.getObject throws an error when it can't find the object, and there
// are a significant number of telemetry objects that we don't have a matching station
// for, and also hence the need to catch the error here.
// console.log({ err })
}

// only process the values if parameter in array or we have a station and Water level match (fsr-123)
if (parameters.indexOf(setOfValues.$.parameter) > -1 || (station && setOfValues.$.parameter === 'Water Level')) {
// if not rloi then Upsert station details to u_flood.sls_telemetry_station, with defaults for rloi specific values
if (!station) {
// dummy values
station = {
RLOI_ID: -1,
Region: item.$.region,
Post_Process: 'n',
Station_Type: 'R'
}

const telemetryStation = setSlsTelemetryStation(item)

await client.query('slsTelemetryStation', telemetryStation)
}

// Store parent details in sls_telemetry_value_parent
const parent = setSlsTelemetryParent(key, station, item, setOfValues)

const res = await client.query('slsTelemetryValueParent', parent)
const values = []
// console.log(`Loaded parent: ${station.RLOI_ID} | ${setOfValues.$.parameter} | ${setOfValues.$.qualifier}`)

for (let i = 0; i < setOfValues.Value.length; i++) {
values[i] = setSlsTelemetryValueItem(i, res, station, setOfValues)
}

// Note: this previously passed a single parameter as a query built using sql-node
await client.query('slsTelemetryValues', values)
// console.log(`Loaded station values: ${station.RLOI_ID} | ${setOfValues.$.parameter} | ${setOfValues.$.qualifier}`)
}
processed++
}))
if (processed === valuesCount) {
console.log('all values processed')
}
if (!item.SetofValues) {
continue
}

normaliseStationRegion(item)

await directly(concurrence, item.SetofValues.map(setOfValues => async () => {
await processSetOfValues({ client, s3, bucket, key, item, setOfValues })
processed++
}))

if (processed === valuesCount) {
console.log('all values processed')
}
}
},
Expand Down
59 changes: 59 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,65 @@ Uses the exports context file from sharepoint to update the stations in the data
This will use [localstack](https://docs.localstack.cloud/) and the supporting
files and documentation to follow.

## Local Debugging - RLOI Process

To test the `rloi-process` Lambda locally against a real database, use the debug runner:

### Setup

1. Ensure your local database is running (PostgreSQL at `127.0.0.1:5433`)
2. Set the database connection:
```bash
export LFW_DATA_DB_CONNECTION=postgres://u_flood:secret@127.0.0.1:5433/temp_flood_db
```

### Running Single Debug Session

```bash
# Automatically generates a unique station reference each run
DEBUG_XML_FILE="./test/data/rloi-test.xml" node test/debug-rloi-local.js

# Or use a fixed station reference to test idempotency
DEBUG_STATION_REF="TEST_STATION_001" DEBUG_XML_FILE="./test/data/rloi-test.xml" node test/debug-rloi-local.js
```

### Testing Duplicate Detection

To verify that `ON CONFLICT DO NOTHING` correctly prevents duplicate rainfall inserts across separate XML files:

```bash
bash test/test-duplicate-detection.sh
```

This script runs two consecutive imports with the same station reference and rainfall data. Expected behavior:
- **Run 1**: Rainfall parent inserted (1 row affected)
- **Run 2**: Duplicate blocked, logged as: `Duplicate telemetry parent skipped: ...`

### Using VS Code Debugger

Add this configuration to `.vscode/launch.json` (not committed to git):

```json
{
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "debug rloiProcess (local DB)",
"program": "${workspaceFolder}/test/debug-rloi-local.js",
"env": {
"LFW_DATA_DB_CONNECTION": "postgres://u_flood:secret@127.0.0.1:5433/temp_flood_db",
"DEBUG_STATION_REF": "DEBUG_TEST_001",
"DEBUG_XML_FILE": "./test/data/rloi-test.xml"
}
}
]
}
```

Then press `F5` to start debugging with breakpoints.

## Deployment

This is installed using terraforms/terragrunt which is managed by WebOps
Expand Down
15 changes: 15 additions & 0 deletions test/data/rloi-test-rainfall-duplicate.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" standalone="yes"?>
<EATimeSeriesDataExchangeFormat xmlns="http://www.environment-agency.gov.uk/XMLSchemas/EATimeSeriesDataExchangeFormat" xmlns:md="http://www.environment-agency.gov.uk/XMLSchemas/EAMetadataFormat" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.environment-agency.gov.uk/XMLSchemas/EATimeSeriesDataExchangeFormat EATimeSeriesDataExchangeFormat.xsd">
<md:Publisher>Environment Agency</md:Publisher>
<md:Source>EA South East Regional Telemetry System (Duplicate)</md:Source>
<md:Description>Automated telemetry data export process</md:Description>
<md:Creator>SCX 6</md:Creator>
<md:Date>2018-06-29</md:Date>
<md:Time>11:03:48</md:Time>
<md:Identifier>SEKMHSCXAS1-DUP</md:Identifier>
<Station stationReference="1" region="TestRegion" stationName="Langley Bottom" ngr="SJ949714">
<SetofValues parameter="Rainfall" qualifier="Tipping Bucket Raingauge" dataType="Total" period="15 min" units="mm" startDate="2021-01-12" startTime="10:30:00" endDate="2021-01-12" endTime="10:30:00">
<Value date="2021-01-12" time="10:30:00">0.0</Value>
</SetofValues>
</Station>
</EATimeSeriesDataExchangeFormat>
Loading
Loading