-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathgeco_irig_decode.py
More file actions
executable file
·192 lines (165 loc) · 6.01 KB
/
geco_irig_decode.py
File metadata and controls
executable file
·192 lines (165 loc) · 6.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python
# (c) Stefan Countryman, 2016-2017
import sys
from datetime import datetime, timedelta
from textwrap import fill
import numpy as np
# import scipy.ndimage.filters as scf
if __name__ == "__main__" and len(sys.argv) > 1:
print(
"Usage: {} <input_file.txt\n\n".format(sys.argv[0]) +
fill(
"""Read a raw IRIG-B signal from STDIN and print out decoded
timestamps. Data must be a newline-delimited list of floating
point values representing the value of the IRIG-B signal at each
point in time. Sample rate must be 16,384 (2^14) Hz. The input
file must contain an integer number of seconds worth of data, i.e.
it must have (Sample Rate) x (N) values, where N is the number of
seconds that must be decoded, and the data must start at the
beginning of a second.
"""
)
)
exit(1)
# ------------------------------------------------------------------------------
# CONSTANTS
# ------------------------------------------------------------------------------
# max and min of histogram, and number of bins
SAMPLE_RATE = 16384 # ADC sample rate
# --------------------------------------------------------------------------
# IRIG-B RELATED CONSTANTS
# --------------------------------------------------------------------------
BITS_PER_SECOND = 100 # bits per second in IRIG-B spec
CONVOLUTION_SIGMA = 1e-4
HIGH_SIGNAL_THRESHOLD = 3500
# find the high/low test points for each type of bit (0, 1, or control);
# measured as indices from start of each bit
# TODO measure in fractions of a second and convert with SAMPLE_RATE
TEST_POINTS = np.array([20, 60, 110, 150])
# find the start of each bit
BIT_STARTS = np.round(np.arange(0, 1, 1./BITS_PER_SECOND) * SAMPLE_RATE)
ALL_TEST_POINT_INDICES = (BIT_STARTS[:, np.newaxis] + TEST_POINTS).astype(int)
# representations of each type of bit (0, 1, or control)
REP_0 = [1, 0, 0, 0]
REP_1 = [1, 1, 0, 0]
REP_C = [1, 1, 1, 0]
CURRENT_CENTURY = 20
# how many seconds does each bit represent?
SECONDS = np.zeros(BITS_PER_SECOND, dtype=int)
SECONDS[1] = 1
SECONDS[2] = 2
SECONDS[3] = 4
SECONDS[4] = 8
SECONDS[6] = 10
SECONDS[7] = 20
SECONDS[8] = 40
# how many minutes does each bit represent?
MINUTES = np.zeros(BITS_PER_SECOND, dtype=int)
MINUTES[10] = 1
MINUTES[11] = 2
MINUTES[12] = 4
MINUTES[13] = 8
MINUTES[15] = 10
MINUTES[16] = 20
MINUTES[17] = 40
# how many hours does each bit represent?
HOURS = np.zeros(BITS_PER_SECOND, dtype=int)
HOURS[20] = 1
HOURS[21] = 2
HOURS[22] = 4
HOURS[23] = 8
HOURS[25] = 10
HOURS[26] = 20
# how many days does each bit represent?
DAYS = np.zeros(BITS_PER_SECOND, dtype=int)
DAYS[30] = 1
DAYS[31] = 2
DAYS[32] = 4
DAYS[33] = 8
DAYS[35] = 10
DAYS[36] = 20
DAYS[37] = 40
DAYS[38] = 80
DAYS[40] = 100
DAYS[41] = 200
# how many years does each bit represent?
YEARS = np.zeros(BITS_PER_SECOND, dtype=int)
YEARS[50] = 1
YEARS[51] = 2
YEARS[52] = 4
YEARS[53] = 8
YEARS[55] = 10
YEARS[56] = 20
YEARS[57] = 40
YEARS[58] = 80
control_bits = np.zeros(BITS_PER_SECOND, dtype=bool)
control_bits[range(9,100,10)] = True
control_bits[0] = True
#-------------------------------------------------------------------------------
# UTILITY FUNCTIONS
#-------------------------------------------------------------------------------
def decode_timeseries(timeseries):
"""Return the full decoded information as a dictionary along with a decoded
datetime object for more convenient manipulation."""
# filter the timeseries to remove ringing at corners
# filt = scf.gaussian_filter1d(timeseries, CONVOLUTION_SIGMA * SAMPLE_RATE)
# check all test points
bits_high = (timeseries[ALL_TEST_POINT_INDICES] >= HIGH_SIGNAL_THRESHOLD).astype(int)
# represent control bits with the number 2
bits = np.zeros(BITS_PER_SECOND, dtype=int)
for i in range(bits_high.shape[0]):
if np.all(bits_high[i] == REP_0): bits[i] = 0
elif np.all(bits_high[i] == REP_1): bits[i] = 1
elif np.all(bits_high[i] == REP_C): bits[i] = 2
else: raise ValueError("Bad bit: " + str(i))
# are the control bits in the correct spots?
if not np.all((bits == 2) == control_bits):
raise ValueError("Control bits are not present where expected: \n"
+ str((bits == 2) == control_bits))
# find total seconds, minutes, hours, days, and years
decoded = {
'second': bits.dot(SECONDS),
'minute': bits.dot(MINUTES),
'hour': bits.dot(HOURS),
'day': bits.dot(DAYS),
'year': bits.dot(YEARS) + 100*CURRENT_CENTURY,
}
# parse a datetime from this
jan1 = datetime(decoded['year'], 1, 1, decoded['hour'], decoded['minute'],
decoded['second'])
decoded['datetime'] = jan1 + timedelta(decoded['day'] - 1)
return decoded
def get_date_from_timeseries(timeseries):
"""Decode the input waveform, which is assumed to be a 16384hz digitized
IRIG-B signal using DCLS (DC Level Shift)."""
return decode_timeseries(timeseries)['datetime']
def print_formatted_date(converted_date):
# finally, print the date
print(converted_date.strftime('%a %b %d %X %Y'))
# and will cause a ValueError.
def read_1_second_from_stdin():
# read in data from stdin; don't read more than a second worth of data
timeseries = np.zeros(SAMPLE_RATE)
line = ''
i = 0
while i < SAMPLE_RATE:
line = sys.stdin.readline()
if not line:
if i == 0:
raise EOFError('Hit EOF at end of a second.')
else:
raise ValueError('Hit EOF ' + str(i) + ' lines into second; '
'provide integer number of seconds of data.')
timeseries[i] = float(line)
i += 1
return timeseries
def main():
while True:
try:
timeseries = read_1_second_from_stdin()
print_formatted_date(get_date_from_timeseries(timeseries))
except EOFError:
return
# run this if we are running from command line
if __name__ == "__main__":
main()