-
Notifications
You must be signed in to change notification settings - Fork 214
Expand file tree
/
Copy pathparse.py
More file actions
430 lines (359 loc) · 16.5 KB
/
parse.py
File metadata and controls
430 lines (359 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
import time
import struct
from openbci.utils.constants import Constants as k
class ParseRaw(object):
def __init__(self,
board_type=k.BOARD_CYTON,
gains=None,
log=False,
micro_volts=False,
scaled_output=True):
self.board_type = board_type
self.gains = gains
self.log = log
self.micro_volts = micro_volts
self.scale_factors = []
self.scaled_output = scaled_output
if gains is not None:
self.scale_factors = self.get_ads1299_scale_factors(self.gains, self.micro_volts)
self.raw_data_to_sample = RawDataToSample(gains=gains,
scale=scaled_output,
scale_factors=self.scale_factors,
verbose=log)
def is_stop_byte(self, byte):
"""
Used to check and see if a byte adheres to the stop byte structure
of 0xCx where x is the set of numbers from 0-F in hex of 0-15 in decimal.
:param byte: {int} - The number to test
:return: {boolean} - True if `byte` follows the correct form
"""
return (byte & 0xF0) == k.RAW_BYTE_STOP
def get_ads1299_scale_factors(self, gains, micro_volts=None):
out = []
for gain in gains:
scale_factor = k.ADS1299_VREF / float((pow(2, 23) - 1)) / float(gain)
if micro_volts is None:
if self.micro_volts:
scale_factor *= 1000000.
else:
if micro_volts:
scale_factor *= 1000000.
out.append(scale_factor)
return out
def get_channel_data_array(self, raw_data_to_sample):
"""
:param raw_data_to_sample: RawDataToSample
:return:
"""
channel_data = []
number_of_channels = len(raw_data_to_sample.scale_factors)
daisy = number_of_channels == k.NUMBER_OF_CHANNELS_DAISY
channels_in_packet = k.NUMBER_OF_CHANNELS_CYTON
if not daisy:
channels_in_packet = number_of_channels
# Channel data arrays are always 8 long
for i in range(channels_in_packet):
counts = self.interpret_24_bit_as_int_32(
raw_data_to_sample.raw_data_packet[
(i * 3) +
k.RAW_PACKET_POSITION_CHANNEL_DATA_START:(i * 3) +
k.RAW_PACKET_POSITION_CHANNEL_DATA_START + 3
]
)
channel_data.append(
raw_data_to_sample.scale_factors[i] *
counts if raw_data_to_sample.scale else counts
)
return channel_data
def get_data_array_accel(self, raw_data_to_sample):
accel_data = []
for i in range(k.RAW_PACKET_ACCEL_NUMBER_AXIS):
counts = self.interpret_16_bit_as_int_32(
raw_data_to_sample.raw_data_packet[
k.RAW_PACKET_POSITION_START_AUX +
(i * 2): k.RAW_PACKET_POSITION_START_AUX + (i * 2) + 2])
accel_data.append(k.CYTON_ACCEL_SCALE_FACTOR_GAIN *
counts if raw_data_to_sample.scale else counts)
return accel_data
def get_raw_packet_type(self, stop_byte):
return stop_byte & 0xF
def interpret_16_bit_as_int_32(self, two_byte_buffer):
return struct.unpack('>h', two_byte_buffer)[0]
def interpret_24_bit_as_int_32(self, three_byte_buffer):
# 3 byte ints
unpacked = struct.unpack('3B', three_byte_buffer)
# 3byte int in 2s compliment
if unpacked[0] > 127:
pre_fix = bytes(bytearray.fromhex('FF'))
else:
pre_fix = bytes(bytearray.fromhex('00'))
three_byte_buffer = pre_fix + three_byte_buffer
# unpack little endian(>) signed integer(i) (makes unpacking platform independent)
return struct.unpack('>i', three_byte_buffer)[0]
def parse_packet_standard_accel(self, raw_data_to_sample):
"""
:param raw_data_to_sample: RawDataToSample
:return:
"""
# Check to make sure data is not null.
if raw_data_to_sample is None:
raise RuntimeError(k.ERROR_UNDEFINED_OR_NULL_INPUT)
if raw_data_to_sample.raw_data_packet is None:
raise RuntimeError(k.ERROR_UNDEFINED_OR_NULL_INPUT)
# Check to make sure the buffer is the right size.
if len(raw_data_to_sample.raw_data_packet) != k.RAW_PACKET_SIZE:
raise RuntimeError(k.ERROR_INVALID_BYTE_LENGTH)
# Verify the correct stop byte.
if raw_data_to_sample.raw_data_packet[0] != k.RAW_BYTE_START:
raise RuntimeError(k.ERROR_INVALID_BYTE_START)
sample_object = OpenBCISample()
sample_object.accel_data = self.get_data_array_accel(raw_data_to_sample)
sample_object.channel_data = self.get_channel_data_array(raw_data_to_sample)
sample_object.sample_number = raw_data_to_sample.raw_data_packet[
k.RAW_PACKET_POSITION_SAMPLE_NUMBER
]
sample_object.start_byte = raw_data_to_sample.raw_data_packet[
k.RAW_PACKET_POSITION_START_BYTE
]
sample_object.stop_byte = raw_data_to_sample.raw_data_packet[
k.RAW_PACKET_POSITION_STOP_BYTE
]
sample_object.valid = True
now_ms = int(round(time.time() * 1000))
sample_object.timestamp = now_ms
sample_object.boardTime = 0
return sample_object
def parse_packet_standard_raw_aux(self, raw_data_to_sample):
"""
:param raw_data_to_sample: RawDataToSample
:return:
"""
# Check to make sure data is not null.
if raw_data_to_sample is None:
raise RuntimeError(k.ERROR_UNDEFINED_OR_NULL_INPUT)
if raw_data_to_sample.raw_data_packet is None:
raise RuntimeError(k.ERROR_UNDEFINED_OR_NULL_INPUT)
# Check to make sure the buffer is the right size.
if len(raw_data_to_sample.raw_data_packet) != k.RAW_PACKET_SIZE:
raise RuntimeError(k.ERROR_INVALID_BYTE_LENGTH)
# Verify the correct stop byte.
if raw_data_to_sample.raw_data_packet[0] != k.RAW_BYTE_START:
raise RuntimeError(k.ERROR_INVALID_BYTE_START)
sample_object = OpenBCISample()
sample_object.aux_data = raw_data_to_sample.raw_data_packet[k.RAW_PACKET_POSITION_START_AUX:k.RAW_PACKET_POSITION_STOP_AUX+1]
if(len(sample_object.aux_data) != 0):
# extract value sample_object.aux_data
sample_object.analog_data = {
'A5': sample_object.aux_data[0] << 8 | sample_object.aux_data[1],
'A6': sample_object.aux_data[2] << 8 | sample_object.aux_data[3]
}
sample_object.digital_data = {
'D11': ((sample_object.aux_data[0] & 0xFF00) >> 8),
'D12': sample_object.aux_data[0] & 0xFF,
'D17': sample_object.aux_data[1] & 0xFF
}
sample_object.packet_type = k.RAW_PACKET_TYPE_STANDARD_RAW_AUX
sample_object.channel_data = self.get_channel_data_array(raw_data_to_sample)
sample_object.sample_number = raw_data_to_sample.raw_data_packet[
k.RAW_PACKET_POSITION_SAMPLE_NUMBER
]
sample_object.start_byte = raw_data_to_sample.raw_data_packet[
k.RAW_PACKET_POSITION_START_BYTE
]
sample_object.stop_byte = raw_data_to_sample.raw_data_packet[
k.RAW_PACKET_POSITION_STOP_BYTE
]
sample_object.valid = True
now_ms = int(round(time.time() * 1000))
sample_object.timestamp = now_ms
sample_object.boardTime = 0
return sample_object
def parse_packet_time_synced_accel(self, raw_data_to_sample):
pass
def parse_packet_time_synced_raw_aux(self, raw_data_to_sample):
pass
def set_ads1299_scale_factors(self, gains, micro_volts=None):
self.scale_factors = self.get_ads1299_scale_factors(gains, micro_volts=micro_volts)
def transform_raw_data_packet_to_sample(self, raw_data):
"""
Used transform raw data packets into fully qualified packets
:param raw_data:
:return:
"""
try:
self.raw_data_to_sample.raw_data_packet = raw_data
packet_type = self.get_raw_packet_type(raw_data[k.RAW_PACKET_POSITION_STOP_BYTE])
if packet_type == k.RAW_PACKET_TYPE_STANDARD_ACCEL:
sample = self.parse_packet_standard_accel(self.raw_data_to_sample)
elif packet_type == k.RAW_PACKET_TYPE_STANDARD_RAW_AUX:
sample = self.parse_packet_standard_raw_aux(self.raw_data_to_sample)
elif packet_type == k.RAW_PACKET_TYPE_ACCEL_TIME_SYNC_SET or \
packet_type == k.RAW_PACKET_TYPE_ACCEL_TIME_SYNCED:
sample = self.parse_packet_time_synced_accel(self.raw_data_to_sample)
elif packet_type == k.RAW_PACKET_TYPE_RAW_AUX_TIME_SYNC_SET or \
packet_type == k.RAW_PACKET_TYPE_RAW_AUX_TIME_SYNCED:
sample = self.parse_packet_time_synced_raw_aux(self.raw_data_to_sample)
else:
sample = OpenBCISample()
sample.error = 'This module does not support packet type %d' % packet_type
sample.valid = False
sample.packet_type = packet_type
except BaseException as e:
sample = OpenBCISample()
if hasattr(e, 'message'):
sample.error = e.message
else:
sample.error = e
sample.valid = False
return sample
def make_daisy_sample_object_wifi(self, lower_sample_object, upper_sample_object):
"""
/**
* @description Used to make one sample object from two sample
* objects. The sample number of the new daisy sample will be the
* upperSampleObject's sample number divded by 2. This allows us
* to preserve consecutive sample numbers that flip over at 127
* instead of 255 for an 8 channel. The daisySampleObject will
* also have one `channelData` array with 16 elements inside it,
* with the lowerSampleObject in the lower indices and the
* upperSampleObject in the upper set of indices. The auxData from
* both channels shall be captured in an object called `auxData`
* which contains two arrays referenced by keys `lower` and
* `upper` for the `lowerSampleObject` and `upperSampleObject`,
* respectively. The timestamps shall be averaged and moved into
* an object called `timestamp`. Further, the un-averaged
* timestamps from the `lowerSampleObject` and `upperSampleObject`
* shall be placed into an object called `_timestamps` which shall
* contain two keys `lower` and `upper` which contain the original
* timestamps for their respective sampleObjects.
* @param lowerSampleObject {Object} - Lower 8 channels with odd sample number
* @param upperSampleObject {Object} - Upper 8 channels with even sample number
* @returns {Object} - The new merged daisy sample object
*/
"""
daisy_sample_object = OpenBCISample()
if lower_sample_object.channel_data is not None:
daisy_sample_object.channel_data = lower_sample_object.channel_data + \
upper_sample_object.channel_data
daisy_sample_object.sample_number = upper_sample_object.sample_number
daisy_sample_object.id = daisy_sample_object.sample_number
daisy_sample_object.aux_data = {
'lower': lower_sample_object.aux_data,
'upper': upper_sample_object.aux_data
}
if lower_sample_object.timestamp:
daisy_sample_object.timestamp = lower_sample_object.timestamp
daisy_sample_object.stop_byte = lower_sample_object.stop_byte
daisy_sample_object._timestamps = {
'lower': lower_sample_object.timestamp,
'upper': upper_sample_object.timestamp
}
if lower_sample_object.accel_data:
if lower_sample_object.accel_data[0] > 0 or lower_sample_object.accel_data[1] > 0 or \
lower_sample_object.accel_data[2] > 0:
daisy_sample_object.accel_data = lower_sample_object.accel_data
else:
daisy_sample_object.accel_data = upper_sample_object.accel_data
daisy_sample_object.valid = True
return daisy_sample_object
"""
/**
* @description Used transform raw data packets into fully qualified packets
* @param o {RawDataToSample} - Used to hold data and configuration settings
* @return {Array} samples An array of {Sample}
* @author AJ Keller (@aj-ptw)
*/
function transformRawDataPacketsToSample (o) {
let samples = [];
for (let i = 0; i < o.rawDataPackets.length; i++) {
o.rawDataPacket = o.rawDataPackets[i];
const sample = transformRawDataPacketToSample(o);
samples.push(sample);
if (sample.hasOwnProperty('sampleNumber')) {
o['lastSampleNumber'] = sample.sampleNumber;
} else if (!sample.hasOwnProperty('impedanceValue')) {
o['lastSampleNumber'] = o.rawDataPacket[k.OBCIPacketPositionSampleNumber];
}
}
return samples;
}
"""
def transform_raw_data_packets_to_sample(self, raw_data_packets):
samples = []
for raw_data_packet in raw_data_packets:
sample = self.transform_raw_data_packet_to_sample(raw_data_packet)
samples.append(sample)
self.raw_data_to_sample.last_sample_number = sample.sample_number
return samples
class RawDataToSample(object):
"""Object encapulsating a parsing object."""
def __init__(self,
accel_data=None,
gains=None,
last_sample_number=0,
raw_data_packets=None,
raw_data_packet=None,
scale=True,
scale_factors=None,
time_offset=0,
verbose=False):
"""
RawDataToSample
:param accel_data: list
The channel settings array
:param gains: list
The gains of each channel, this is used to derive number of channels
:param last_sample_number: int
:param raw_data_packets: list
list of raw_data_packets
:param raw_data_packet: bytearray
A single raw data packet
:param scale: boolean
Default `true`. A gain of 24 for Cyton will be used and 51 for ganglion by default.
:param scale_factors: list
Calculated scale factors
:param time_offset: int
For non time stamp use cases i.e. 0xC0 or 0xC1 (default and raw aux)
:param verbose:
"""
self.accel_data = accel_data if accel_data is not None else []
self.gains = gains if gains is not None else []
self.time_offset = time_offset
self.last_sample_number = last_sample_number
self.raw_data_packets = raw_data_packets if raw_data_packets is not None else []
self.raw_data_packet = raw_data_packet
self.scale = scale
self.scale_factors = scale_factors if scale_factors is not None else []
self.verbose = verbose
class OpenBCISample(object):
"""Object encapulsating a single sample from the OpenBCI board."""
def __init__(self,
aux_data=None,
board_time=0,
channel_data=None,
error=None,
imp_data=None,
packet_type=k.RAW_PACKET_TYPE_STANDARD_ACCEL,
protocol=k.PROTOCOL_WIFI,
sample_number=0,
start_byte=0,
stop_byte=0,
valid=True,
accel_data=None):
self.aux_data = aux_data if aux_data is not None else []
self.board_time = board_time
self.channel_data = channel_data if aux_data is not None else []
self.error = error
self.id = sample_number
self.imp_data = imp_data if aux_data is not None else []
self.packet_type = packet_type
self.protocol = protocol
self.sample_number = sample_number
self.start_byte = start_byte
self.stop_byte = stop_byte
self.timestamp = 0
self._timestamps = {}
self.valid = valid
self.accel_data = accel_data if accel_data is not None else []
self.analog_data = {}
self.digital_data = {}