|
9 | 9 | import ijson |
10 | 10 | import orjson |
11 | 11 | import polars as pl |
12 | | -from httpx import Client, HTTPStatusError, TimeoutException, get |
| 12 | +from httpx import Client, get |
13 | 13 | from lxml import etree, html |
14 | 14 | from prefect.transactions import transaction |
15 | 15 | from tenacity import ( |
16 | 16 | retry, |
17 | | - retry_if_exception_type, |
18 | 17 | stop_after_attempt, |
19 | 18 | wait_exponential, |
20 | 19 | ) |
|
44 | 43 | ) |
45 | 44 |
|
46 | 45 |
|
47 | | -@retry( |
48 | | - stop=stop_after_attempt(3), |
49 | | - wait=wait_exponential(multiplier=1, min=1, max=10), |
50 | | - retry=retry_if_exception_type(httpx.HTTPError), # On ne retry que sur erreur http |
51 | | -) |
| 46 | +@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=20)) |
52 | 47 | def stream_get(url: str, chunk_size=1024**2): # chunk_size en octets (1 Mo par défaut) |
53 | 48 | if url.startswith("http"): |
54 | 49 | try: |
55 | 50 | with HTTP_CLIENT.stream( |
56 | | - "GET", url, headers=HTTP_HEADERS, follow_redirects=True |
| 51 | + "GET", url, headers=HTTP_HEADERS, follow_redirects=True, timeout=20 |
57 | 52 | ) as response: |
58 | 53 | yield from response.iter_bytes(chunk_size) |
59 | 54 | except httpx.TooManyRedirects: |
@@ -397,22 +392,19 @@ def get_etablissements() -> pl.LazyFrame: |
397 | 392 | hrefs.append(base_url + href) |
398 | 393 |
|
399 | 394 | # Fonction de traitement pour un fichier |
| 395 | + @retry( |
| 396 | + stop=stop_after_attempt(4), wait=wait_exponential(multiplier=1, min=1, max=20) |
| 397 | + ) |
400 | 398 | def get_process_file(_href: str): |
401 | | - print(_href.split("/")[-1]) |
402 | | - try: |
403 | | - response = http_client.get( |
404 | | - _href, headers=HTTP_HEADERS, timeout=20 |
405 | | - ).raise_for_status() |
406 | | - except (HTTPStatusError, TimeoutException) as err: |
407 | | - print(err) |
408 | | - print("Nouvel essai...") |
409 | | - response = http_client.get( |
410 | | - _href, headers=HTTP_HEADERS, timeout=20 |
411 | | - ).raise_for_status() |
| 399 | + response = http_client.get( |
| 400 | + _href, headers=HTTP_HEADERS, timeout=30 |
| 401 | + ).raise_for_status() |
412 | 402 |
|
413 | 403 | content = response.content |
414 | 404 | lff = pl.scan_csv(content, schema_overrides=schema) |
415 | 405 | lff = lff.select(columns) |
| 406 | + print(_href.split("/")[-1], "OK") |
| 407 | + |
416 | 408 | return lff |
417 | 409 |
|
418 | 410 | # Traitement en parrallèle avec 8 threads |
|
0 commit comments