-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathprocessMessage.js
More file actions
225 lines (197 loc) · 9 KB
/
processMessage.js
File metadata and controls
225 lines (197 loc) · 9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
'use strict'
const xml2js = require('xml2js')
const moment = require('moment')
const service = require('../helpers/service')
const eventSchema = require('../schemas/processMessageEventSchema')
const aws = require('../helpers/aws')
const { validateXML } = require('xmllint-wasm')
const fs = require('node:fs')
const path = require('node:path')
const xsdSchema = fs.readFileSync(path.join(__dirname, '..', 'schemas', 'CAP-v1.2.xsd'), 'utf8')
const additionalCapMessageSchema = require('../schemas/additionalCapMessageSchema')
const Message = require('../models/message')
const EA_WHO = '2.49.0.1.826.1'
const CODE = 'MCP:v2.0'
const severityV2Mapping = require('../models/v2MessageMapping')
const redis = require('../helpers/redis')
const meteoalarm = require('../helpers/meteoalarm')
module.exports.processMessage = async (event) => {
console.log('[processMessage] Event received:', event)
try {
// validate the event
await eventSchema.validateAsync(event)
// parse the xml
const message = new Message(event.bodyXml)
console.log(`[processMessage] Processing CAP message: ${message.identifier} for ${message.fwisCode}`)
// get Last message
const dbResult = await service.getLastMessage(message.fwisCode)
const lastMessage = (!!dbResult && dbResult.rows.length > 0) ? dbResult.rows[0] : undefined
if (lastMessage) {
console.log(`[processMessage] Found last message for ${message.fwisCode}: identifier=${lastMessage.identifier}, expires=${lastMessage.expires}, status=${lastMessage.status}`)
} else {
console.log(`[processMessage] No previous message found for ${message.fwisCode}`)
}
// If not production set status to test
if (process.env.stage !== 'prd') {
message.status = 'Test'
}
console.log(`[processMessage] Running in stage: ${process.env.stage}, message status set to: ${message.status}`)
// Add in the references field and update msgtype to Update if references exist and is Alert (does this in message model)
const references = buildReference(lastMessage, message.sender, 'identifier', 'references')
if (references) {
message.references = references
console.log(`[processMessage] Built references: ${references}, msgType updated to: ${message.msgType}`)
}
// Generate message V2 for meteoalarm spec
const messageV2 = processMessageV2(message, lastMessage)
// do validation against OASIS CAP xml schema and extended JOI schema
console.log(`[processMessage] Starting validation for ${message.identifier}`)
const validationStart = Date.now()
const results = await Promise.allSettled([
validateAgainstXsdSchema(message),
validateAgainstJoiSchema(message),
validateAgainstXsdSchema(messageV2),
validateAgainstJoiSchema(messageV2)
])
console.log(`[processMessage] Validation completed in ${Date.now() - validationStart}ms for ${message.identifier}`)
// Check for validation failures and throw
const errors = results.filter(r => r.status === 'rejected').flatMap(r => r.reason)
if (errors.length > 0) {
throw new Error(JSON.stringify(errors))
}
const { message: redisMessage, query: dbQuery } = message.putQuery(message, messageV2)
// store the message in database, redis/elasticache, and post to Meteoalarm
const storageStart = Date.now()
const promises = [
service.putMessage(dbQuery),
redis.set(redisMessage.identifier, redisMessage)
]
if (process.env.CPX_METEOALARM_DISABLE === 'true') {
console.log('[processMessage] Meteoalarm integration is disabled')
} else if (message.isWarning) {
promises.push(meteoalarm.postWarning(messageV2.toString(), message.identifier))
} else {
console.log('[processMessage] Meteoalarm integration disabled for alert area')
}
await Promise.all(promises)
console.log(`[processMessage] Storage operations completed in ${Date.now() - storageStart}ms`)
console.log(`[processMessage] DB write successful for ${message.identifier}`)
console.log(`[processMessage] Redis cache set for ${message.identifier}`)
console.log(`[processMessage] Finished processing CAP message: ${message.identifier} for ${message.fwisCode}`)
return {
statusCode: 200,
body: {
message: `Cap message successfully stored for ${message.fwisCode}`,
identifier: message.identifier,
fwisCode: message.fwisCode,
sent: message.sent,
expires: message.expires,
status: message.status
}
}
} catch (err) {
// Actual error will be handled by lambda process
// So just log the message body to console for investigation
console.error(`[processMessage] Error during processing: ${err.message}`)
console.error(`[processMessage] Error stack: ${err.stack}`)
console.log('[processMessage] Failed message body:', event.bodyXml)
return processFailedMessage(err, event.bodyXml)
}
}
const processFailedMessage = async (originalError, xmlResult) => {
// For backwards compatibility, only send a notification if an AWS SNS topic
// is configured.
if (process.env.CPX_SNS_TOPIC) {
try {
const messageLog = {
receivedMessage: JSON.stringify(xmlResult),
errorMessage: originalError.message,
dateCreated: new Date().toISOString()
}
// Send a notification of failed message processing.
await aws.email.publishMessage(messageLog)
throw originalError
} catch (err) {
err.message = `[500] ${err.message}`
throw err
}
} else {
// Propagate the error to preserve legacy behaviour.
throw originalError
}
}
const buildReference = (lastMessage, sender, idField, refField) => {
if (lastMessage && lastMessage.expires > new Date()) {
const newReference = `${sender},${lastMessage[idField]},${moment(lastMessage.sent).utc().format('YYYY-MM-DDTHH:mm:ssZ')}`
return lastMessage[refField] ? `${lastMessage[refField]} ${newReference}` : newReference
}
return ''
}
const validateAgainstXsdSchema = async (message) => {
const validationResult = await validateXML({
xml: [{
fileName: 'message.xml',
contents: message.toString()
}],
schema: [xsdSchema]
})
if (validationResult.errors?.length > 0) {
throw validationResult.errors
}
}
const validateAgainstJoiSchema = async (message) => {
const jsMessage = await new Promise((resolve, reject) => {
xml2js.parseString(message.toString(), (err, value) => {
if (err) return reject(err)
return resolve(value)
})
})
const joiValidation = additionalCapMessageSchema.validate(jsMessage, { abortEarly: false })
if (joiValidation.error) {
throw joiValidation.error?.details
}
}
const formatDate = (isoString) => {
const date = new Date(isoString)
const pad = n => n.toString().padStart(2, '0')
const YYYY = date.getUTCFullYear()
const MM = pad(date.getUTCMonth() + 1)
const DD = pad(date.getUTCDate())
const HH = pad(date.getUTCHours())
const mm = pad(date.getUTCMinutes())
const SS = pad(date.getUTCSeconds())
return `${YYYY}${MM}${DD}${HH}${mm}${SS}`
}
// Generates a new message based on the Meteoalarm specification https://eaflood.atlassian.net/browse/NI-121
const processMessageV2 = (message, lastMessage) => {
const messageV2 = new Message(message.toString())
messageV2.identifier = message.sent && message.identifier ? `${EA_WHO}.${formatDate(message.sent)}.${message.identifier}` : ''
messageV2.code = CODE
// Add in the references field and update msgtype to Update if references exist and is Alert (does this in message model)
const referencesV2 = buildReference(lastMessage, message.sender, 'identifier_v2', 'references_v2')
if (referencesV2) {
messageV2.references = referencesV2
}
messageV2.event = `${severityV2Mapping[message.severity]?.description}: ${messageV2.areaDesc}`
messageV2.responseType = 'Monitor'
messageV2.severity = severityV2Mapping[message.severity]?.severity || ''
messageV2.onset = message.sent
messageV2.headline = `${severityV2Mapping[message.severity]?.headline}: ${messageV2.areaDesc}`
let instruction = severityV2Mapping[message.severity]?.instruction
if (instruction) {
const quickdialSentence = severityV2Mapping[message.severity]?.quickdialSentence
const quickdialNumber = messageV2.quickdialNumber
// add fwisCode to instruction target area url
instruction = instruction.replace('{{ fwisCode }}', messageV2.fwisCode)
// if we have a number inject into the sentence, otherwise remove the sentence fully
instruction = instruction.replace('{{ quickdialSentence }}', quickdialNumber ? quickdialSentence.replace('{{ quickdialNumber }}', quickdialNumber) : '')
messageV2.instruction = instruction
}
messageV2.addParameter('awareness_level', severityV2Mapping[message.severity]?.awarenessLevel || '')
messageV2.addParameter('awareness_type', '12; Flooding')
messageV2.addParameter('impacts', severityV2Mapping[message.severity]?.impact || '')
messageV2.addParameter('use_polygon_over_geocode', 'true')
messageV2.addParameter('uk_ea_ta_code', message.fwisCode)
messageV2.removeNode('geocode')
return messageV2
}