-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathhttp.py
More file actions
128 lines (104 loc) · 4.08 KB
/
http.py
File metadata and controls
128 lines (104 loc) · 4.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from logging import Logger
from typing import Any, Callable, Dict, Iterator, Optional, Tuple, cast
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from urllib3 import PoolManager
from urllib3.exceptions import MaxRetryError
from urllib3.util import Retry
from ld_eventsource.errors import HTTPContentTypeError, HTTPStatusError
_CHUNK_SIZE = 10000
DynamicQueryParams = Callable[[], dict[str, str]]
"""
A callable that returns a dictionary of query parameters to add to the URL.
This can be used to modify query parameters dynamically for each connection attempt.
"""
class _HttpConnectParams:
def __init__(
self,
url: str,
headers: Optional[dict] = None,
pool: Optional[PoolManager] = None,
urllib3_request_options: Optional[dict] = None,
query_params: Optional[DynamicQueryParams] = None
):
self.__url = url
self.__headers = headers
self.__pool = pool
self.__urllib3_request_options = urllib3_request_options
self.__query_params = query_params
@property
def url(self) -> str:
return self.__url
@property
def query_params(self) -> Optional[DynamicQueryParams]:
return self.__query_params
@property
def headers(self) -> Optional[dict]:
return self.__headers
@property
def pool(self) -> Optional[PoolManager]:
return self.__pool
@property
def urllib3_request_options(self) -> Optional[dict]:
return self.__urllib3_request_options
class _HttpClientImpl:
def __init__(self, params: _HttpConnectParams, logger: Logger):
self.__params = params
self.__pool = params.pool or PoolManager()
self.__should_close_pool = params.pool is not None
self.__logger = logger
def connect(self, last_event_id: Optional[str]) -> Tuple[Iterator[bytes], Callable, Dict[str, Any]]:
url = self.__params.url
if self.__params.query_params is not None:
qp = self.__params.query_params()
if qp:
url_parts = list(urlsplit(url))
query = dict(parse_qsl(url_parts[3]))
query.update(qp)
url_parts[3] = urlencode(query)
url = urlunsplit(url_parts)
self.__logger.info("Connecting to stream at %s" % url)
headers = self.__params.headers.copy() if self.__params.headers else {}
headers['Cache-Control'] = 'no-cache'
headers['Accept'] = 'text/event-stream'
if last_event_id:
headers['Last-Event-ID'] = last_event_id
request_options = (
self.__params.urllib3_request_options.copy()
if self.__params.urllib3_request_options
else {}
)
request_options['headers'] = headers
try:
resp = self.__pool.request(
'GET',
url,
preload_content=False,
retries=Retry(
total=None, read=0, connect=0, status=0, other=0, redirect=3
),
**request_options
)
except MaxRetryError as e:
reason: Optional[Exception] = e.reason
if reason is not None:
raise reason # e.reason is the underlying I/O error
# Capture headers early so they're available for both error and success cases
response_headers = cast(Dict[str, Any], resp.headers)
if resp.status >= 400 or resp.status == 204:
raise HTTPStatusError(resp.status, response_headers)
content_type = resp.headers.get('Content-Type', None)
if content_type is None or not str(content_type).startswith(
"text/event-stream"
):
raise HTTPContentTypeError(content_type or '', response_headers)
stream = resp.stream(_CHUNK_SIZE)
def close():
try:
resp.shutdown()
except Exception:
pass
resp.release_conn()
return stream, close, response_headers
def close(self):
if self.__should_close_pool:
self.__pool.clear()