-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy path_utils.py
More file actions
349 lines (255 loc) · 10.3 KB
/
_utils.py
File metadata and controls
349 lines (255 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
from __future__ import annotations
import hashlib
import hmac
import io
import json
import string
import time
from base64 import b64encode, urlsafe_b64encode
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload
import impit
from apify_client.errors import InvalidResponseBodyError
if TYPE_CHECKING:
from datetime import timedelta
from impit import Response
from apify_client.errors import ApifyApiError
T = TypeVar('T')
_BASE62_CHARSET = string.digits + string.ascii_letters
"""Module-level constant for base62 encoding."""
@overload
def to_seconds(td: None, *, as_int: bool = ...) -> None: ...
@overload
def to_seconds(td: timedelta) -> float: ...
@overload
def to_seconds(td: timedelta, *, as_int: Literal[True]) -> int: ...
@overload
def to_seconds(td: timedelta, *, as_int: Literal[False]) -> float: ...
def to_seconds(td: timedelta | None, *, as_int: bool = False) -> float | int | None:
"""Convert timedelta to seconds.
Args:
td: The timedelta to convert, or None.
as_int: If True, round and return as int. Defaults to False.
Returns:
The total seconds as a float (or int if as_int=True), or None if input is None.
"""
if td is None:
return None
seconds = td.total_seconds()
return int(seconds) if as_int else seconds
def catch_not_found_or_throw(exc: ApifyApiError) -> None:
"""Suppress 404 Not Found errors and re-raise all other API errors.
Args:
exc: The API error to check.
Raises:
ApifyApiError: If the error is not a 404 Not Found error.
"""
is_not_found_status = exc.status_code == HTTPStatus.NOT_FOUND
is_not_found_type = exc.type in ['record-not-found', 'record-or-token-not-found']
if not (is_not_found_status and is_not_found_type):
raise exc
def filter_none_values(
data: dict,
*,
remove_empty_dicts: bool | None = None,
) -> dict:
"""Recursively remove None values from a dictionary.
The Apify API ignores missing fields but may reject fields explicitly set to None. This helper prepares
request payloads by stripping None values from nested dictionaries.
Uses an iterative, stack-based approach for better performance on deeply nested structures.
Args:
data: Dictionary to clean.
remove_empty_dicts: Whether to remove empty dictionaries after filtering.
Returns:
A new dictionary with all None values removed.
"""
# Use an explicit stack to avoid recursion overhead
result = {}
# Stack entries are (source_dict, target_dict)
stack: list[tuple[dict, dict]] = [(data, result)]
while stack:
source, target = stack.pop()
for key, val in source.items():
if val is None:
continue
if isinstance(val, dict):
nested = {}
target[key] = nested
stack.append((val, nested))
else:
target[key] = val
# Optionally remove empty dictionaries
if remove_empty_dicts:
_remove_empty_dicts_inplace(result)
return result
def _remove_empty_dicts_inplace(data: dict[str, Any]) -> None:
"""Recursively remove empty dictionaries from a dict in place.
This is a helper function for filter_none_values.
"""
keys_to_remove = list[str]()
for key, val in data.items():
if isinstance(val, dict):
_remove_empty_dicts_inplace(val)
if not val:
keys_to_remove.append(key)
for key in keys_to_remove:
del data[key]
def encode_webhook_list_to_base64(webhooks: list[dict]) -> str:
"""Encode a list of webhook dictionaries to base64 for API transmission.
Args:
webhooks: A list of webhook dictionaries with keys like "event_types", "request_url", etc.
Returns:
A base64-encoded JSON string.
"""
data = list[dict]()
for webhook in webhooks:
webhook_representation = {
'eventTypes': [enum_to_value(event_type) for event_type in webhook['event_types']],
'requestUrl': webhook['request_url'],
}
if 'payload_template' in webhook:
webhook_representation['payloadTemplate'] = webhook['payload_template']
if 'headers_template' in webhook:
webhook_representation['headersTemplate'] = webhook['headers_template']
data.append(webhook_representation)
return b64encode(json.dumps(data).encode('utf-8')).decode('ascii')
def encode_key_value_store_record_value(value: Any, content_type: str | None = None) -> tuple[Any, str]:
"""Encode a value for storage in a key-value store record.
Args:
value: The value to encode (can be dict, str, bytes, or file-like object).
content_type: The content type; if None, it's inferred from the value type.
Returns:
A tuple of (encoded_value, content_type).
"""
if not content_type:
if isinstance(value, (bytes, bytearray, io.IOBase)):
content_type = 'application/octet-stream'
elif isinstance(value, str):
content_type = 'text/plain; charset=utf-8'
else:
content_type = 'application/json; charset=utf-8'
if (
'application/json' in content_type
and not isinstance(value, (bytes, bytearray, io.IOBase))
and not isinstance(value, str)
):
# Don't use indentation to reduce size.
value = json.dumps(
value,
ensure_ascii=False,
allow_nan=False,
default=str,
).encode('utf-8')
return (value, content_type)
def enum_to_value(value: Any) -> Any:
"""Convert an Enum member to its value, or return the value unchanged if not an Enum.
Ensures Enum instances are converted to primitive values suitable for API transmission.
Args:
value: The value to potentially convert (Enum member or any other type).
Returns:
The Enum's value if the input is an Enum; otherwise returns the input unchanged.
"""
if isinstance(value, Enum):
return value.value
return value
def is_retryable_error(exc: Exception) -> bool:
"""Check if the given error is retryable.
All ``impit.HTTPError`` subclasses are considered retryable because they represent transport-level failures
(network issues, timeouts, protocol errors, body decoding errors) that are typically transient. HTTP status
code errors are handled separately in ``_make_request`` based on the response status code, not here.
"""
return isinstance(
exc,
(
InvalidResponseBodyError,
impit.HTTPError,
),
)
def to_safe_id(id: str) -> str:
"""Convert a resource ID to URL-safe format by replacing forward slashes with tildes.
Args:
id: The resource identifier in format `resource_id` or `username/resource_id`.
Returns:
The resource identifier with `/` characters replaced by `~`.
"""
return id.replace('/', '~')
def response_to_dict(response: Response) -> dict:
"""Parse the API response as a dictionary and validate its type.
Args:
response: The HTTP response object from the API.
Returns:
The parsed response as a dictionary.
Raises:
ValueError: If the response is not a dictionary.
"""
data = response.json()
if isinstance(data, dict):
return data
raise ValueError(f'The response is not a dictionary. Got: {type(data).__name__}')
def response_to_list(response: Response) -> list:
"""Parse the API response as a list and validate its type.
Args:
response: The HTTP response object from the API.
Returns:
The parsed response as a list.
Raises:
ValueError: If the response is not a list.
"""
data = response.json()
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
raise ValueError(f'The response is not a list. Got: {type(data).__name__}')
def encode_base62(num: int) -> str:
"""Encode an integer to a base62 string.
Args:
num: The number to encode.
Returns:
The base62-encoded string.
"""
if num == 0:
return _BASE62_CHARSET[0]
# Use list to build result for O(n) complexity instead of O(n^2) string concatenation.
parts = []
while num > 0:
num, remainder = divmod(num, 62)
parts.append(_BASE62_CHARSET[remainder])
# Reverse and join once at the end.
return ''.join(reversed(parts))
def create_hmac_signature(secret_key: str, message: str) -> str:
"""Generate an HMAC-SHA256 signature and encode it using base62.
The HMAC signature is truncated to 30 characters and then encoded in base62 to reduce the signature length.
Args:
secret_key: The secret key used for signing.
message: The message to be signed.
Returns:
The base62-encoded signature.
"""
signature = hmac.new(secret_key.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()[:30]
decimal_signature = int(signature, 16)
return encode_base62(decimal_signature)
def create_storage_content_signature(
resource_id: str,
url_signing_secret_key: str,
expires_in: timedelta | None = None,
version: int = 0,
) -> str:
"""Create a secure signature for a storage resource like a dataset or key-value store.
This signature is used to generate a signed URL for authenticated access, which can be expiring or permanent.
The signature is created using HMAC with the provided secret key and includes the resource ID, expiration time,
and version.
Args:
resource_id: The unique identifier of the storage resource.
url_signing_secret_key: The secret key for signing the URL.
expires_in: Optional expiration duration; if None, the signature never expires.
version: The signature version number (default: 0).
Returns:
The base64url-encoded signature string.
"""
expires_at = int(time.time() * 1000) + int(to_seconds(expires_in) * 1000) if expires_in is not None else 0
message_to_sign = f'{version}.{expires_at}.{resource_id}'
hmac_sig = create_hmac_signature(url_signing_secret_key, message_to_sign)
base64url_encoded_payload = urlsafe_b64encode(f'{version}.{expires_at}.{hmac_sig}'.encode())
return base64url_encoded_payload.decode('utf-8')