-
Notifications
You must be signed in to change notification settings - Fork 53
Expand file tree
/
Copy pathcommons.py
More file actions
308 lines (264 loc) · 11.5 KB
/
commons.py
File metadata and controls
308 lines (264 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import logging
import sys
import time
import warnings
from functools import wraps
import requests
import threading
import itertools
try:
from Queue import Queue
except ImportError:
from queue import Queue
_CALL_BATCH_SIZE = 200
_NUM_THREADS_DEFAULT = 4
def _create_rest_url(host, version, species, category, subcategory,
resource, query_id, options):
"""Creates the URL for querying the REST service"""
# Creating the basic URL
url_items = [host, 'webservices/rest', version, species, category,
subcategory, query_id, resource]
url_items = filter(None, url_items) # Some url items can be empty
url = ('/'.join(url_items))
# Checking optional params
if options is not None:
opts = []
for k, v in options.items():
if k == 'debug':
continue
if isinstance(v, list):
opts.append(k + '=' + ','.join(map(str, v)))
else:
opts.append(k + '=' + str(v))
if opts:
url += '?' + '&'.join(opts)
return url
def _fetch(session, host, version, species, category, subcategory, resource,
query_id=None, options=None, method='get', data=None):
"""Queries the REST service retrieving results until exhaustion or limit"""
# HERE BE DRAGONS
final_response = None
# Setting up skip and limit default parameters
call_skip = 0
call_limit = 1000
max_limit = None
if options is None:
opts = {'skip': call_skip, 'limit': call_limit}
else:
opts = options.copy() # Do not modify original data!
if 'skip' not in opts:
opts['skip'] = call_skip
# If 'limit' is specified, a maximum of 'limit' results will be returned
if 'limit' in opts:
max_limit = opts['limit']
# Server must be always queried for results in groups of 1000
opts['limit'] = call_limit
# If there is a query_id, the next variables will be used
total_id_list = [] # All initial ids
next_id_list = [] # Ids which should be queried again for more results
next_id_indexes = [] # Ids position in the final response
if query_id is not None:
total_id_list = query_id.split(',')
# If some query has more than 'call_limit' results, the server will be
# queried again to retrieve the next 'call_limit results'
call = True
current_query_id = None # Current REST query
current_id_list = None # Current list of ids
time_out_counter = 0 # Number of times a query is repeated due to time-out
while call:
# Check 'limit' parameter if there is a maximum limit of results
if max_limit is not None and max_limit <= call_limit:
opts['limit'] = max_limit
# Updating query_id and list of ids to query
if query_id is not None:
if current_query_id is None:
current_query_id = query_id
current_id_list = total_id_list
current_id_indexes = range(len(total_id_list))
else:
current_query_id = ','.join(next_id_list)
current_id_list = next_id_list
current_id_indexes = next_id_indexes
# Retrieving url
url = _create_rest_url(host=host,
version=version,
species=species,
category=category,
subcategory=subcategory,
query_id=current_query_id,
resource=resource,
options=opts)
# DEBUG
if options is not None:
if 'debug' in options and options['debug']:
sys.stderr.write(url + '\n')
# Getting REST response
if method == 'get':
r = session.get(url)
elif method == 'post':
r = session.post(url, data=data)
else:
msg = 'Method "' + method + '" not implemented'
raise NotImplementedError(msg)
if r.status_code == 504: # Gateway Time-out
if time_out_counter == 99:
msg = 'Server not responding in time'
raise requests.ConnectionError(msg)
time_out_counter += 1
time.sleep(1)
continue
time_out_counter = 0
try:
json_obj = r.json()
if 'response' in json_obj:
response = json_obj['response']
else:
return json_obj
except ValueError:
msg = 'Bad JSON format retrieved from server'
raise ValueError(msg)
# Setting up final_response
if final_response is None:
final_response = response
# Concatenating results
else:
if query_id is not None:
for index, res in enumerate(response):
id_index = current_id_indexes[index]
final_response[id_index]['result'] += res['result']
else:
final_response[0]['result'] += response[0]['result']
if query_id is not None:
# Checking which ids are completely retrieved
next_id_list = []
next_id_indexes = []
for index, res in enumerate(response):
if res['numResults'] == call_limit:
next_id_list.append(current_id_list[index])
next_id_indexes.append(current_id_indexes[index])
# Ending REST calling when there are no more ids to retrieve
if not next_id_list:
call = False
else:
# Ending REST calling when there are no more results to retrieve
if response[0]['numResults'] != call_limit:
call = False
# Skipping the first 'limit' results to retrieve the next ones
opts['skip'] += call_limit
# Subtracting the number of returned results from the maximum goal
if max_limit is not None:
max_limit -= call_limit
# When 'limit' is 0 returns all the results. So, break the loop if 0
if max_limit == 0:
break
return final_response
def _worker(queue, results, session, host, version, species, category,
subcategory, resource, options=None, method='get', data=None):
"""Manages the queue system for the threads"""
while True:
# Fetching new element from the queue
index, query_id = queue.get()
response = _fetch(session, host, version, species, category,
subcategory, resource, query_id, options, method,
data)
# Store data in results at correct index
results[index] = response
# Signaling to the queue that task has been processed
queue.task_done()
def get(session, host, version, species, category, subcategory, resource,
query_id=None, options=None, method='get', data=None):
"""Queries the REST service using multiple threads if needed"""
# If query_id is an array, convert to comma-separated string
if query_id is not None and isinstance(query_id, list):
query_id = ','.join(query_id)
# If data is an array, convert to comma-separated string
if data is not None and isinstance(data, list):
data = ','.join(data)
# Multithread if the number of queries is greater than _CALL_BATCH_SIZE
if query_id is None or len(query_id.split(',')) <= _CALL_BATCH_SIZE:
response = _fetch(session, host, version, species, category,
subcategory, resource, query_id, options, method,
data)
return response
else:
if options is not None and 'num_threads' in options:
num_threads = options['num_threads']
else:
num_threads = _NUM_THREADS_DEFAULT
# Splitting query_id into batches depending on the call batch size
id_list = query_id.split(',')
id_batches = [','.join(id_list[x:x+_CALL_BATCH_SIZE])
for x in range(0, len(id_list), _CALL_BATCH_SIZE)]
# Setting up the queue to hold all the id batches
q = Queue(maxsize=0)
# Creating a size defined list to store thread results
res = [''] * len(id_batches)
# Setting up the threads
for thread in range(num_threads):
t = threading.Thread(target=_worker,
kwargs={'queue': q,
'results': res,
'session': session,
'host': host,
'version': version,
'species': species,
'category': category,
'subcategory': subcategory,
'resource': resource,
'options': options,
'method': method,
'data': data})
# Setting threads as "daemon" allows main program to exit eventually
# even if these do not finish correctly
t.setDaemon(True)
t.start()
# Loading up the queue with index and id batches for each job
for index, batch in enumerate(id_batches):
q.put((index, batch)) # Notice this is a tuple
# Waiting until the queue has been processed
q.join()
# Joining all the responses into a one final response
final_response = list(itertools.chain.from_iterable(res))
return final_response
def deprecated(func):
"""Prints a warning for functions marked as deprecated"""
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) # turn off filter
warnings.warn('Call to deprecated function "{}".'.format(func.__name__),
category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) # reset filter
return func(*args, **kwargs)
return new_func
def retry(exception_type, total_tries=2, wait_period=2, backoff=2):
"""
Definition of a spicy little decorator-wrapper class
This will wrap methods (e.g. samtools reads), catching only specific error types, and replaying the command
This uses exponential backoff-retrier logical timing, and a set number of retries/adjustable wait period
"""
def retry_decorator(f):
@wraps(f)
def func_with_retries(*args, **kwargs):
_tries, _delay = total_tries + 1, wait_period
while _tries > 1:
try:
return f(*args, **kwargs)
except exception_type as e:
_tries -= 1
if _tries == 1:
logging.error(
"Attempted command {} times, without success".format(
total_tries
),
exc_info=True,
)
raise
logging.warning(
"Timeout detected, retrying in {} seconds. Args {}, Kwargs {}".format(
_delay, args, kwargs
)
)
time.sleep(_delay)
# for each failure, extend the wait
_delay *= backoff
return func_with_retries
return retry_decorator