forked from ave-dcd/dcd_mapping
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresource_utils.py
More file actions
163 lines (140 loc) · 5.2 KB
/
resource_utils.py
File metadata and controls
163 lines (140 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""Provide basic utilities for fetching and storing external data."""
import logging
import os
import time
from pathlib import Path
import click
import httpx
from tqdm import tqdm
_logger = logging.getLogger(__name__)
# Common representations of missing/null data in CSV files
MISSING_VALUE_REPRESENTATIONS = frozenset(
{
"NA",
"N/A",
"na",
"n/a",
"NaN",
"nan",
"null",
"NULL",
"None",
"none",
"",
"-",
".",
}
)
MAVEDB_API_KEY = os.environ.get("MAVEDB_API_KEY")
MAVEDB_BASE_URL = os.environ.get("MAVEDB_BASE_URL")
ENSEMBL_API_URL = os.environ.get("ENSEMBL_API_URL", "https://rest.ensembl.org") # TODO
CDOT_URL = os.environ.get("CDOT_URL", "cdot-rest:8000")
LOCAL_STORE_PATH = Path(
os.environ.get(
"DCD_MAPPING_RESOURCES_DIR", Path.home() / ".local" / "share" / "dcd_mapping"
)
)
if not LOCAL_STORE_PATH.exists():
LOCAL_STORE_PATH.mkdir(exist_ok=True, parents=True)
def is_missing_value(value: str | None) -> bool:
"""Check if a value represents missing/null data.
This function recognizes multiple common representations of missing data
that may appear in CSV files from external sources, making the codebase
more resilient to upstream changes in NA representation.
:param value: The value to check
:return: True if the value represents missing data, False otherwise
"""
if value is None:
return True
# Strip whitespace and check against known missing value representations
return value.strip() in MISSING_VALUE_REPRESENTATIONS
def authentication_header() -> dict | None:
"""Fetch with api key envvar, if available."""
return {"X-API-key": MAVEDB_API_KEY} if MAVEDB_API_KEY is not None else None
def http_download(url: str, out_path: Path, silent: bool = True) -> Path:
"""Download a file via HTTP.
:param url: location of file to retrieve
:param out_path: location to save file to
:param silent: show TQDM progress bar if true
:return: Path if download successful
:raise httpx.HTTPStatusError: if request is unsuccessful
"""
if not silent:
click.echo(f"Downloading {out_path.name} to {out_path.parents[0].absolute()}")
with httpx.stream("GET", url, timeout=60, headers=authentication_header()) as r:
r.raise_for_status()
total_size = int(r.headers.get("content-length", 0))
with out_path.open("wb") as h:
if not silent:
with tqdm(
total=total_size,
unit="B",
unit_scale=True,
desc=out_path.name,
ncols=80,
) as progress_bar:
for chunk in r.iter_bytes(chunk_size=8192):
if chunk:
h.write(chunk)
progress_bar.update(len(chunk))
else:
for chunk in r.iter_bytes(chunk_size=8192):
if chunk:
h.write(chunk)
return out_path
def request_with_backoff(
url: str, max_retries: int = 5, backoff_factor: float = 0.3, **kwargs
) -> httpx.Response:
"""HTTP GET with exponential backoff only for retryable errors.
Retries on:
- Connection timeout or connection errors
- HTTP 5xx server errors
- HTTP 429 rate limiting (respecting Retry-After when present)
Immediately raises on other HTTP errors (e.g., 4xx client errors).
"""
attempt = 0
while attempt < max_retries:
try:
kwargs.setdefault("timeout", 60)
response = httpx.get(url, **kwargs)
except (httpx.TimeoutException, httpx.ConnectError):
# Retry on transient network failures
if attempt == max_retries - 1:
raise
sleep_time = backoff_factor * (2**attempt)
time.sleep(sleep_time)
attempt += 1
continue
# If we have a response, decide retry based on status code
status = response.status_code
if 200 <= status < 300:
return response
# 429: Too Many Requests — optionally use Retry-After
if status == 429:
if attempt == max_retries - 1:
response.raise_for_status()
retry_after = response.headers.get("Retry-After")
try:
sleep_time = (
float(retry_after)
if retry_after is not None
else backoff_factor * (2**attempt)
)
except ValueError:
sleep_time = backoff_factor * (2**attempt)
time.sleep(sleep_time)
attempt += 1
continue
# 5xx: server errors — retry
if 500 <= status < 600:
if attempt == max_retries - 1:
response.raise_for_status()
sleep_time = backoff_factor * (2**attempt)
time.sleep(sleep_time)
attempt += 1
continue
# Non-retryable (e.g., 4xx other than 429): raise immediately
response.raise_for_status()
# Exhausted retries without success
msg = f"Failed to fetch {url} after {max_retries} attempts"
raise Exception(msg)