-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathutility.py
More file actions
189 lines (152 loc) · 8.31 KB
/
utility.py
File metadata and controls
189 lines (152 loc) · 8.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import os, sys, copy
import numpy as np
from scipy.integrate import cumulative_trapezoid
from scipy.optimize import minimize
from scipy.stats import median_abs_deviation, norm
def huber(x, M):
"""Huber loss function, for outlier-robust applications, `see here <https://www.cvxpy.org/api_reference/cvxpy.atoms.elementwise.html#huber>`_"""
absx = np.abs(x)
return np.where(absx <= M, 0.5*x**2, M*(absx - 0.5*M))
def huber_const(M):
"""Scale that makes :code:`sum(huber())` interpolate :math:`\\sqrt{2}\\|\\cdot\\|_1` and :math:`\\frac{1}{2}\\|\\cdot\\|_2^2`,
from https://jmlr.org/papers/volume14/aravkin13a/aravkin13a.pdf, with correction for missing sqrt"""
a = 2*np.exp(-M**2 / 2)/M
b = np.sqrt(2*np.pi)*(2*norm.cdf(M) - 1)
return np.sqrt((2*a*(1 + M**2)/M**2 + b)/(a + b))
def integrate_dxdt_hat(dxdt_hat, dt_or_t):
"""Wrapper for scipy.integrate.cumulative_trapezoid. Use 0 as first value so lengths match, see #88.
:param np.array[float] dxdt_hat: estimate derivative of timeseries
:param float dt_or_t: step size if given as a scalar or a vector of sample locations
:return: **x_hat** (np.array[float]) -- integral of dxdt_hat
"""
return cumulative_trapezoid(dxdt_hat, initial=0)*dt_or_t if np.isscalar(dt_or_t) \
else cumulative_trapezoid(dxdt_hat, x=dt_or_t, initial=0)
def estimate_integration_constant(x, x_hat, M=6):
"""Integration leaves an unknown integration constant. This function finds a best fit integration
constant to correct the DC of :code:`x_hat` (the integral of dxdt_hat) by optimizing
:math:`\\min_c J(x - \\hat{x} + c)`, where :math:`J` is the Huber loss function or the :math:`\\ell_1`
or :math:`\\ell_2` norm.
:param np.array[float] x: timeseries of measurements
:param np.array[float] x_hat: smoothed estimate of x
:param float M: constant estimation is robustified with the Huber loss. :code:`M` here is in units of scaled
mean absolute deviation of residuals, so scatter can be calculated and used to normalize without being
thrown off by outliers. The default is intended to capture the idea of "six sigma": Assuming Gaussian
:code:`x - xhat` errors, the portion of inliers beyond the Huber loss' transition is only about 1.97e-9.
:return: **integration constant** (float) -- initial condition that best aligns x_hat with x
"""
sigma = median_abs_deviation(x - x_hat, scale='normal') # M is in units of this robust scatter metric
if M == float('inf') or sigma < 1e-3: # If no scatter, then no outliers, so use L2
return np.mean(x - x_hat) # Solves the l2 distance minimization, argmin_{x0} ||x_hat + x0 - x||_2^2
elif M < 1e-3: # small M looks like l1 loss, and Huber gets too flat to work well
return np.median(x - x_hat) # Solves the l1 distance minimization, argmin_{x0} ||x_hat + x0 - x||_1
else:
return minimize(lambda x0: np.sum(huber(x - (x_hat+x0), M*sigma)), # fn to minimize in 1st argument
0, method='SLSQP').x[0] # result is a vector, even if initial guess is just a scalar
def mean_kernel(window_size):
"""A uniform boxcar of total integral 1"""
return np.ones(window_size)/window_size
def gaussian_kernel(window_size):
"""A truncated gaussian"""
sigma = window_size / 6.
t = np.linspace(-2.7*sigma, 2.7*sigma, window_size)
ker = 1/np.sqrt(2*np.pi*sigma**2) * np.exp(-(t**2)/(2*sigma**2)) # gaussian function itself
return ker / np.sum(ker)
def friedrichs_kernel(window_size):
"""A bump function"""
x = np.linspace(-0.999, 0.999, window_size)
ker = np.exp(-1/(1-x**2))
return ker / np.sum(ker)
def convolutional_smoother(x, kernel, num_iterations=1):
"""Perform smoothing by convolving x with a kernel.
:param np.array[float] x: 1D data
:param np.array[float] kernel: kernel to use in convolution
:param int num_iterations: number of iterations, >=1
:return: **x_hat** (np.array[float]) -- smoothed x
"""
pad_width = len(kernel)//2
x_hat = x
for i in range(num_iterations):
x_padded = np.pad(x_hat, pad_width, mode='symmetric') # pad with repetition of the edges
x_hat = np.convolve(x_padded, kernel, 'valid')[:len(x)] # 'valid' slices out only full-overlap spots
return x_hat
def slide_function(func, x, dt, kernel, *args, stride=1, pass_weights=False, **kwargs):
"""Slide a smoothing derivative function across a timeseries with specified window size, and
combine the results according to kernel weights.
:param callable func: name of the function to slide
:param np.array[float] x: data to differentiate
:param float dt: step size
:param np.array[float] kernel: values to weight the sliding window
:param list args: passed to func
:param int stride: step size for slide (e.g. 1 means slide by 1 step)
:param bool pass_weights: whether weights should be passed to func via update to kwargs
:param dict kwargs: passed to func
:return: tuple[np.array, np.array] of\n
- **x_hat** -- estimated (smoothed) x
- **dxdt_hat** -- estimated derivative of x
"""
if len(kernel) % 2 == 0: raise ValueError("Kernel window size should be odd.")
half_window_size = (len(kernel) - 1)//2 # int because len(kernel) is always odd
x_hat = np.zeros(x.shape)
dxdt_hat = np.zeros(x.shape)
weight_sum = np.zeros(x.shape)
for i,midpoint in enumerate(range(0, len(x), stride)): # iterate window midpoints
# find where to index data and kernel, taking care at edges
start = max(0, midpoint - half_window_size)
end = min(len(x), midpoint + half_window_size + 1) # +1 because slicing is exclusive of end
window = slice(start, end)
kstart = max(0, half_window_size - midpoint)
kend = kstart + (end - start)
kslice = slice(kstart, kend)
w = kernel if (end-start) == len(kernel) else kernel[kslice]/np.sum(kernel[kslice])
if pass_weights: kwargs['weights'] = w
# run the function on the window and add weighted results to cumulative answers
x_window_hat, dxdt_window_hat = func(x[window], dt, *args, **kwargs)
x_hat[window] += w * x_window_hat
dxdt_hat[window] += w * dxdt_window_hat
weight_sum[window] += w # save sum of weights for normalization at the end
return x_hat/weight_sum, dxdt_hat/weight_sum
def peakdet(x, delta, t=None):
"""Find peaks and valleys of 1D array. A point is considered a maximum peak if it has the maximal
value, and was preceded (to the left) by a value lower by delta. Converted from MATLAB script at
http://billauer.co.il/peakdet.html Eli Billauer, 3.4.05 (Explicitly not copyrighted). This function
is released to the public domain; Any use is allowed.
:param np.array[float] x: array for which to find peaks and valleys
:param float delta: threshold for finding peaks and valleys. A point is considered a maximum peak
if it has the maximal value, and was preceded (to the left) by a value lower by delta.
:param np.array[float] t: optional domain points where data comes from, to make indices into locations
:return: tuple[np.array, np.array] of\n
- **maxtab** -- indices or locations (column 1) and values (column 2) of maxima
- **mintab** -- indices or locations (column 1) and values (column 2) of minima
"""
maxtab = []
mintab = []
if t is None:
t = np.arange(len(x))
elif len(x) != len(t):
raise ValueError('Input vectors x and t must have same length')
if not (np.isscalar(delta) and delta > 0):
raise ValueError('Input argument delta must be a positive scalar')
mn, mx = np.inf, -1*np.inf
mnpos, mxpos = np.nan, np.nan
lookformax = True
for i in np.arange(len(x)):
this = x[i]
if this > mx:
mx = this
mxpos = t[i]
if this < mn:
mn = this
mnpos = t[i]
if lookformax:
if this < mx-delta:
maxtab.append((mxpos, mx))
mn = this
mnpos = t[i]
lookformax = False # now searching for a min
else:
if this > mn+delta:
mintab.append((mnpos, mn))
mx = this
mxpos = t[i]
lookformax = True # now searching for a max
return np.array(maxtab), np.array(mintab)