-
Notifications
You must be signed in to change notification settings - Fork 66
Expand file tree
/
Copy pathrequest.py
More file actions
485 lines (396 loc) · 15.1 KB
/
request.py
File metadata and controls
485 lines (396 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
import json
import logging
import re
import secrets
import socket
from dataclasses import dataclass
from datetime import date, datetime
from gzip import GzipFile
from io import BytesIO
from typing import Any, List, Optional, Tuple, Union
from uuid import uuid4
import requests
from dateutil.tz import tzutc
from requests.adapters import HTTPAdapter # type: ignore[import-untyped]
from urllib3.connection import HTTPConnection
from urllib3.util.retry import Retry
from posthog.utils import remove_trailing_slash
from posthog.version import VERSION
SocketOptions = List[Tuple[int, int, Union[int, bytes]]]
KEEPALIVE_IDLE_SECONDS = 60
KEEPALIVE_INTERVAL_SECONDS = 60
KEEPALIVE_PROBE_COUNT = 3
# TCP keepalive probes idle connections to prevent them from being dropped.
# SO_KEEPALIVE is cross-platform, but timing options vary:
# - Linux: TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT
# - macOS: only SO_KEEPALIVE (uses system defaults)
# - Windows: TCP_KEEPIDLE, TCP_KEEPINTVL (since Windows 10 1709)
KEEP_ALIVE_SOCKET_OPTIONS: SocketOptions = list(
HTTPConnection.default_socket_options
) + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]
for attr, value in [
("TCP_KEEPIDLE", KEEPALIVE_IDLE_SECONDS),
("TCP_KEEPINTVL", KEEPALIVE_INTERVAL_SECONDS),
("TCP_KEEPCNT", KEEPALIVE_PROBE_COUNT),
]:
if hasattr(socket, attr):
KEEP_ALIVE_SOCKET_OPTIONS.append((socket.SOL_TCP, getattr(socket, attr), value))
def _mask_tokens_in_url(url: str) -> str:
"""Mask token values in URLs for safe logging, keeping first 10 chars visible."""
return re.sub(r"(token=)([^&]{10})[^&]*", r"\1\2...", url)
@dataclass
class GetResponse:
"""Response from a GET request with ETag support."""
data: Any
etag: Optional[str] = None
not_modified: bool = False
class HTTPAdapterWithSocketOptions(HTTPAdapter):
"""HTTPAdapter with configurable socket options."""
def __init__(self, *args, socket_options: Optional[SocketOptions] = None, **kwargs):
self.socket_options = socket_options
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
if self.socket_options is not None:
kwargs["socket_options"] = self.socket_options
super().init_poolmanager(*args, **kwargs)
def _build_session(socket_options: Optional[SocketOptions] = None) -> requests.Session:
adapter = HTTPAdapterWithSocketOptions(
max_retries=Retry(
total=2,
connect=2,
read=2,
),
socket_options=socket_options,
)
session = requests.sessions.Session()
session.mount("https://", adapter)
return session
_session = _build_session()
_socket_options: Optional[SocketOptions] = None
_pooling_enabled = True
def _get_session() -> requests.Session:
if _pooling_enabled:
return _session
return _build_session(_socket_options)
def set_socket_options(socket_options: Optional[SocketOptions]) -> None:
"""
Configure socket options for all HTTP connections.
Example:
from posthog import set_socket_options
set_socket_options([(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)])
"""
global _session, _socket_options
if socket_options == _socket_options:
return
_socket_options = socket_options
_session = _build_session(socket_options)
def enable_keep_alive() -> None:
"""Enable TCP keepalive to prevent idle connections from being dropped."""
set_socket_options(KEEP_ALIVE_SOCKET_OPTIONS)
def disable_connection_reuse() -> None:
"""Disable connection reuse, creating a fresh connection for each request."""
global _pooling_enabled
_pooling_enabled = False
US_INGESTION_ENDPOINT = "https://us.i.posthog.com"
EU_INGESTION_ENDPOINT = "https://eu.i.posthog.com"
DEFAULT_HOST = US_INGESTION_ENDPOINT
USER_AGENT = "posthog-python/" + VERSION
def determine_server_host(host: Optional[str]) -> str:
"""Determines the server host to use."""
host_or_default = host or DEFAULT_HOST
trimmed_host = remove_trailing_slash(host_or_default)
if trimmed_host in ("https://app.posthog.com", "https://us.posthog.com"):
return US_INGESTION_ENDPOINT
elif trimmed_host == "https://eu.posthog.com":
return EU_INGESTION_ENDPOINT
else:
return host_or_default
def post(
api_key: str,
host: Optional[str] = None,
path=None,
gzip: bool = False,
timeout: int = 15,
**kwargs,
) -> requests.Response:
"""Post the `kwargs` to the API"""
log = logging.getLogger("posthog")
body = kwargs
body["sentAt"] = datetime.now(tz=tzutc()).isoformat()
url = remove_trailing_slash(host or DEFAULT_HOST) + path
body["api_key"] = api_key
data = json.dumps(body, cls=DatetimeSerializer)
log.debug("making request: %s to url: %s", data, url)
headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT}
if gzip:
headers["Content-Encoding"] = "gzip"
buf = BytesIO()
with GzipFile(fileobj=buf, mode="w") as gz:
# 'data' was produced by json.dumps(),
# whose default encoding is utf-8.
gz.write(data.encode("utf-8"))
data = buf.getvalue()
res = _get_session().post(url, data=data, headers=headers, timeout=timeout)
if res.status_code == 200:
log.debug("data uploaded successfully")
return res
def _process_response(
res: requests.Response, success_message: str, *, return_json: bool = True
) -> Union[requests.Response, Any]:
log = logging.getLogger("posthog")
if res.status_code == 200:
log.debug(success_message)
response = res.json() if return_json else res
# Handle quota limited decide responses by raising a specific error
# NB: other services also put entries into the quotaLimited key, but right now we only care about feature flags
# since most of the other services handle quota limiting in other places in the application.
if (
isinstance(response, dict)
and "quotaLimited" in response
and isinstance(response["quotaLimited"], list)
and "feature_flags" in response["quotaLimited"]
):
log.warning(
"[FEATURE FLAGS] PostHog feature flags quota limited, resetting feature flag data. Learn more about billing limits at https://posthog.com/docs/billing/limits-alerts"
)
raise QuotaLimitError(res.status_code, "Feature flags quota limited")
return response
try:
payload = res.json()
log.debug("received response: %s", payload)
raise APIError(res.status_code, payload["detail"])
except (KeyError, ValueError):
raise APIError(res.status_code, res.text)
def decide(
api_key: str,
host: Optional[str] = None,
gzip: bool = False,
timeout: int = 15,
**kwargs,
) -> Any:
"""Post the `kwargs to the decide API endpoint"""
res = post(api_key, host, "/decide/?v=4", gzip, timeout, **kwargs)
return _process_response(res, success_message="Feature flags decided successfully")
def flags(
api_key: str,
host: Optional[str] = None,
gzip: bool = False,
timeout: int = 15,
**kwargs,
) -> Any:
"""Post the `kwargs to the flags API endpoint"""
res = post(api_key, host, "/flags/?v=2", gzip, timeout, **kwargs)
return _process_response(
res, success_message="Feature flags evaluated successfully"
)
def remote_config(
personal_api_key: str,
project_api_key: str,
host: Optional[str] = None,
key: str = "",
timeout: int = 15,
) -> Any:
"""Get remote config flag value from remote_config API endpoint"""
response = get(
personal_api_key,
f"/api/projects/@current/feature_flags/{key}/remote_config?token={project_api_key}",
host,
timeout,
)
return response.data
def batch_post(
api_key: str,
host: Optional[str] = None,
gzip: bool = False,
timeout: int = 15,
**kwargs,
) -> requests.Response:
"""Post the `kwargs` to the batch API endpoint for events"""
res = post(api_key, host, "/batch/", gzip, timeout, **kwargs)
return _process_response(
res, success_message="data uploaded successfully", return_json=False
)
def get(
api_key: str,
url: str,
host: Optional[str] = None,
timeout: Optional[int] = None,
etag: Optional[str] = None,
) -> GetResponse:
"""
Make a GET request with optional ETag support.
If an etag is provided, sends If-None-Match header. Returns GetResponse with:
- not_modified=True and data=None if server returns 304
- not_modified=False and data=response if server returns 200
"""
log = logging.getLogger("posthog")
full_url = remove_trailing_slash(host or DEFAULT_HOST) + url
headers = {"Authorization": "Bearer %s" % api_key, "User-Agent": USER_AGENT}
if etag:
headers["If-None-Match"] = etag
res = _get_session().get(full_url, headers=headers, timeout=timeout)
masked_url = _mask_tokens_in_url(full_url)
# Handle 304 Not Modified
if res.status_code == 304:
log.debug(f"GET {masked_url} returned 304 Not Modified")
response_etag = res.headers.get("ETag")
return GetResponse(data=None, etag=response_etag or etag, not_modified=True)
# Handle normal response
data = _process_response(
res, success_message=f"GET {masked_url} completed successfully"
)
response_etag = res.headers.get("ETag")
return GetResponse(data=data, etag=response_etag, not_modified=False)
class APIError(Exception):
def __init__(self, status: Union[int, str], message: str):
self.message = message
self.status = status
def __str__(self):
msg = "[PostHog] {0} ({1})"
return msg.format(self.message, self.status)
class QuotaLimitError(APIError):
pass
class DatetimeSerializer(json.JSONEncoder):
def default(self, obj: Any):
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)
def build_ai_multipart_request(
event_name: str,
distinct_id: str,
properties: dict[str, Any],
blobs: dict[str, Any],
timestamp: Optional[str] = None,
event_uuid: Optional[str] = None,
) -> tuple[bytes, str]:
"""
Build a multipart/form-data request body for AI events.
Args:
event_name: The event name (e.g., "$ai_generation")
distinct_id: The distinct ID for the event
properties: Event properties (without blob properties)
blobs: Dictionary of blob properties to include as separate parts
timestamp: Optional timestamp for the event
event_uuid: Optional UUID for the event
Returns:
Tuple of (body_bytes, boundary) for the multipart request
Format follows the /i/v0/ai endpoint spec:
Part 1: "event" - JSON with {uuid, event, distinct_id, timestamp}
Part 2: "event.properties" - JSON with non-blob properties
Part 3+: "event.properties.$ai_input" etc. - Blob data as JSON
"""
# Generate a random boundary that's unlikely to appear in the data
boundary = "----WebKitFormBoundary" + secrets.token_hex(16)
# Ensure we have a UUID
if event_uuid is None:
event_uuid = str(uuid4())
# Build the event part
event_data = {
"uuid": event_uuid,
"event": event_name,
"distinct_id": distinct_id,
}
if timestamp is not None:
event_data["timestamp"] = timestamp
# Build multipart body
parts = []
# Part 1: event
parts.append(f"--{boundary}\r\n".encode())
parts.append(b'Content-Disposition: form-data; name="event"\r\n')
parts.append(b"Content-Type: application/json\r\n\r\n")
parts.append(json.dumps(event_data, cls=DatetimeSerializer).encode("utf-8"))
parts.append(b"\r\n")
# Part 2: event.properties
parts.append(f"--{boundary}\r\n".encode())
parts.append(b'Content-Disposition: form-data; name="event.properties"\r\n')
parts.append(b"Content-Type: application/json\r\n\r\n")
parts.append(json.dumps(properties, cls=DatetimeSerializer).encode("utf-8"))
parts.append(b"\r\n")
# Part 3+: blob parts
for blob_name, blob_value in blobs.items():
blob_id = str(uuid4())
parts.append(f"--{boundary}\r\n".encode())
parts.append(
f'Content-Disposition: form-data; name="event.properties.{blob_name}"; filename="{blob_id}"\r\n'.encode()
)
parts.append(b"Content-Type: application/json\r\n\r\n")
parts.append(json.dumps(blob_value, cls=DatetimeSerializer).encode("utf-8"))
parts.append(b"\r\n")
# Final boundary
parts.append(f"--{boundary}--\r\n".encode())
# Combine all parts
body = b"".join(parts)
return body, boundary
def ai_post(
api_key: str,
host: Optional[str] = None,
gzip: bool = False,
timeout: int = 15,
**kwargs,
) -> requests.Response:
"""
Post an AI event to the /i/v0/ai endpoint using multipart/form-data.
Args:
api_key: The PostHog API key
host: The host to post to
gzip: Whether to gzip compress the request
timeout: Request timeout in seconds
**kwargs: Event parameters including event_name, distinct_id, properties, blobs, etc.
Returns:
The response from the server
Raises:
APIError: If the request fails
"""
log = logging.getLogger("posthog")
# Extract event parameters
event_name = kwargs.get("event_name")
distinct_id = kwargs.get("distinct_id")
properties = kwargs.get("properties", {})
blobs = kwargs.get("blobs", {})
timestamp = kwargs.get("timestamp")
event_uuid = kwargs.get("uuid")
# Build multipart request
body, boundary = build_ai_multipart_request(
event_name=event_name,
distinct_id=distinct_id,
properties=properties,
blobs=blobs,
timestamp=timestamp,
event_uuid=event_uuid,
)
# Optionally gzip compress the body if enabled and body is large enough
# Spec recommends compression for requests > 10KB
data = body
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Authorization": f"Bearer {api_key}",
"User-Agent": USER_AGENT,
}
if gzip or len(body) > 10 * 1024: # Compress if gzip enabled or body > 10KB
headers["Content-Encoding"] = "gzip"
buf = BytesIO()
with GzipFile(fileobj=buf, mode="w") as gz:
gz.write(body)
data = buf.getvalue()
log.debug("Compressed AI event from %d bytes to %d bytes", len(body), len(data))
url = remove_trailing_slash(host or DEFAULT_HOST) + "/i/v0/ai"
log.debug("Posting AI event to %s", url)
log.debug(
"Event: %s, Distinct ID: %s, Blobs: %s",
event_name,
distinct_id,
list(blobs.keys()),
)
res = _session.post(url, data=data, headers=headers, timeout=timeout)
if res.status_code == 200:
log.debug("AI event uploaded successfully")
return res
# Handle errors
try:
payload = res.json()
log.debug("Received error response: %s", payload)
raise APIError(res.status_code, payload.get("detail", "Unknown error"))
except (KeyError, ValueError):
raise APIError(res.status_code, res.text)