forked from diffpy/diffpy.morph
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtools.py
More file actions
280 lines (225 loc) · 7.6 KB
/
tools.py
File metadata and controls
280 lines (225 loc) · 7.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#!/usr/bin/env python
##############################################################################
#
# diffpy.morph by DANSE Diffraction group
# Simon J. L. Billinge
# (c) 2010 Trustees of the Columbia University
# in the City of New York. All rights reserved.
#
# File coded by: Chris Farrow
#
# See AUTHORS.txt for a list of people who contributed.
# See LICENSE.txt for license information.
#
##############################################################################
"""Tools used in morphs and morph chains.
"""
import numpy
from diffpy.utils.parsers.loaddata import loadData
from diffpy.utils.parsers.serialization import deserialize_data
def estimateScale(y_morph_in, y_target_in):
"""Set the scale that best matches the morph to the target."""
dot = numpy.dot
scale = dot(y_morph_in, y_target_in) / dot(y_morph_in, y_morph_in)
return scale
def estimateBaselineSlope(r, gr, rmin=None, rmax=None):
"""Estimate the slope of the linear baseline of a PDF.
This fits a slope into the equation slope*r through the bottom of the PDF.
Parameters
----------
r
The r-grid used for the PDF.
gr
The PDF over the r-grid.
rmin
The minimum r-value to consider. If this is None (default) is None,
then the minimum of r is used.
rmax
The maximum r-value to consider. If this is None (default) is None,
then the maximum of r is used.
Returns
-------
slope: float
The slope of baseline. If the PDF is scaled properly, this is equal
to -4*pi*rho0.
"""
from numpy import dot
from scipy.optimize import leastsq
rp = r.copy()
grp = gr.copy()
if rmax is not None:
grp = grp[rp <= rmax]
rp = rp[rp <= rmax]
if rmin is not None:
grp = grp[rp >= rmin]
rp = rp[rp >= rmin]
def chiv(pars):
slope = pars[0]
# This tries to fit the baseline through the center of the PDF.
chiv = grp - slope * rp
# This adds additional penalty if there are negative terms, that
# is, if baseline > PDF.
diff = chiv.copy()
diff[diff > 0] = 0
negpenalty = dot(diff, diff)
chiv *= 1 + 0.5 * negpenalty
return chiv
# Optimize to get the best slope
slope, ier = leastsq(chiv, [0.0])
# Return the slope
return slope
def getRw(chain):
"""Get Rw from the outputs of a morph or chain."""
# Make sure we put these on the proper grid
x_morph, y_morph, x_target, y_target = chain.xyallout
diff = y_target - y_morph
rw = numpy.dot(x_morph * diff, diff)
rw /= numpy.dot(x_morph * y_morph, y_morph)
rw = rw
return rw
def get_pearson(chain):
from scipy.stats import pearsonr
x_morph, y_morph, x_target, y_target = chain.xyallout
pcc, pval = pearsonr(y_morph, y_target)
return pcc
def readPDF(fname):
"""Reads an .gr file, loads r and G(r) vectors.
Parameters
----------
fname
Name of the file we want to read.
Returns
-------
r,gr
Arrays read from data.
"""
rv = loadData(fname, unpack=True)
if len(rv) >= 2:
return rv[:2]
return (None, None)
def nn_value(val, name):
"""Convenience function for ensuring certain non-negative inputs."""
if val < 0:
negative_value_warning = (
f"\n# Negative value for {name} given. "
"Using absolute value instead."
)
print(negative_value_warning)
return -val
return val
def deserialize(serial_file):
"""Call deserialize_data from diffpy.utils.
Parameters
----------
serial_file
Name of file to deserialize.
Returns
-------
dict
Data read from serial file.
"""
return deserialize_data(serial_file)
def case_insensitive_dictionary_search(key: str, dictionary: dict):
"""Search for key in dictionary ignoring case.
Parameters
----------
key: str
dictionary: dict
Returns
-------
value or None
Corresponding value if key is in dictionary. None otherwise.
"""
for ci_key in dictionary.keys():
if key.lower() == ci_key.lower():
key = ci_key
break
return dictionary.get(key)
def field_sort(
filepaths: list, field, reverse=False, serfile=None, get_field_values=False
):
"""Sort a list of files by a field stored in header information.
All files must contain this header information.
Parameters
----------
filepaths
List of paths to files that we want to sort.
field
The field we want to sort by. Not case-sensitive.
reverse
Sort in reverse alphabetical/numerical order.
serfile
Path to a serial file with field information for each file.
get_field_values: bool
Boolean indicating whether to also return a List of field values
(default False). This List of field values is parallel to the sorted
list of filepaths with items in the same position corresponding to
each other.
Returns
-------
list
Sorted list of paths. When get_fv is true, also return an associated
field list.
"""
# Get the field from each file
files_field_values = []
if serfile is None:
for path in filepaths:
fhd = loadData(path, headers=True)
files_field_values.append(
[path, case_insensitive_dictionary_search(field, fhd)]
)
else:
# deserialize the serial file
des_dict = deserialize_data(serfile)
# get names of each file to search the serial file
import pathlib
for path in filepaths:
name = pathlib.Path(path).name
fv = case_insensitive_dictionary_search(field, des_dict.get(name))
files_field_values.append([path, fv])
# Sort files by field, reverse if reverse flag true
try:
files_field_values.sort(key=lambda entry: entry[1], reverse=reverse)
# Raised if fields for any file are missing
except (ValueError, TypeError):
raise KeyError("Field missing.")
if get_field_values:
return [pair[0] for pair in files_field_values], [
pair[1] for pair in files_field_values
]
else:
return [pair[0] for pair in files_field_values]
def get_values_from_dictionary_collection(
dictionary_collection: iter, target_key
):
"""In an (iterable) collection of dictionaries, search for a target key
in each dictionary. Return a list of all found values corresponding
to that key.
Parameters
----------
dictionary_collection: iter
The collection of dictionaries to search through.
target_key
The key to search for in each dictionary. For each dictionary in
dictionary_collection that has that key, the corresponding value is
appended to a List called values.
Returns
-------
list
The found values.
"""
# Store all values corresponding to the target_key into this list
values = []
# Handle dictionary-type iterable
if type(dictionary_collection) is dict:
# Assume the dictionaries are stored in the values and keys indicate
# names of the dictionaries
dictionary_collection = dictionary_collection.values()
# All other type iterables are handled the same way as a list
# Perform the (case-insensitive) search
for entry in dictionary_collection:
search_result = case_insensitive_dictionary_search(target_key, entry)
if search_result is not None:
values.append(search_result)
return values