Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-batches-retry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"posthog-php": patch
---

Retain queued batches after network or timeout flush failures.
19 changes: 15 additions & 4 deletions lib/Consumer/ForkCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function getConsumer()
* Make an async request to our API. Fork a curl process, immediately send
* to the API. If debug is enabled, we wait for the response.
* @param array<int, array<string, mixed>> $messages Array of messages to send.
* @return bool Whether the request succeeded.
* @return bool|string Whether the request succeeded or a queue failure classification.
*/
public function flushBatch($messages)
{
Expand All @@ -64,7 +64,7 @@ public function flushBatch($messages)

if (0 != $exit) {
$this->handleError($exit, $output);
return false;
return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

$cmd .= " -H 'Content-Encoding: gzip'";
Expand All @@ -82,7 +82,7 @@ public function flushBatch($messages)
error_log("[PostHog][" . $this->type . "] " . $msg);
}

return false;
return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

// Send user agent in the form of {library_name}/{library_version} as per RFC 7231.
Expand All @@ -102,6 +102,17 @@ public function flushBatch($messages)
unlink($tmpfname);
}

return 0 == $exit;
if (0 == $exit) {
return true;
}

return $this->isNetworkCurlExit($exit)
? self::FLUSH_BATCH_RETRYABLE_FAILURE
: self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

private function isNetworkCurlExit($exit)
{
return in_array((int) $exit, [6, 7, 28, 35, 52, 56], true);
}
}
29 changes: 25 additions & 4 deletions lib/Consumer/LibCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public function getConsumer()
* enabled, we wait for the response
* and retry once to diminish impact on performance.
* @param array<int, array<string, mixed>> $messages Array of messages to send.
* @return bool Whether the request succeeded.
* @return bool|string Whether the request succeeded or a queue failure classification.
*/
public function flushBatch($messages)
{
Expand All @@ -65,11 +65,15 @@ public function flushBatch($messages)
error_log("[PostHog][" . $this->type . "] " . $msg);
}

return false;
return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

if ($this->compress_request) {
$payload = gzencode($payload);

if (false === $payload) {
return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}
}

$shouldVerify = $this->options['verify_batch_events_request'] ?? true;
Expand All @@ -86,10 +90,27 @@ public function flushBatch($messages)
);

if (!$shouldVerify) {
return $response->getResponse() !== false;
if ($response->getResponse() !== false) {
return true;
}

return $this->isNetworkFailure($response)
? self::FLUSH_BATCH_RETRYABLE_FAILURE
: self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

// Keep batch success semantics aligned with HttpClient retry handling and the Socket consumer.
return $response->getResponseCode() === 200;
if ($response->getResponseCode() === 200) {
return true;
}

return $this->isNetworkFailure($response)
? self::FLUSH_BATCH_RETRYABLE_FAILURE
: self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

private function isNetworkFailure($response)
{
return $response->getResponseCode() === 0 && $response->getCurlErrno() !== 0;
}
}
14 changes: 10 additions & 4 deletions lib/Consumer/Socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,27 @@ public function getConsumer()
* Send a batch of queued messages.
*
* @param array<int, array<string, mixed>> $batch Batch of queued messages.
* @return bool Whether the request succeeded.
* @return bool|string Whether the request succeeded or a queue failure classification.
*/
public function flushBatch($batch)
{
$socket = $this->createSocket();

if (!$socket) {
return false;
return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}
Comment thread
marandaneto marked this conversation as resolved.

$payload = $this->payload($batch);
$payload = json_encode($payload);

$body = $this->createBody($this->host, $payload);
if (false === $body) {
return false;
return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

return $this->makeRequest($socket, $body);
return $this->makeRequest($socket, $body)
? true
: self::FLUSH_BATCH_NON_RETRYABLE_FAILURE;
}

private function createSocket()
Expand Down Expand Up @@ -198,6 +200,10 @@ private function createBody($host, $content)
if ($this->compress_request) {
$content = gzencode($content);

if (false === $content) {
return false;
}

$req .= "Content-Encoding: gzip\r\n";
}

Expand Down
15 changes: 11 additions & 4 deletions lib/HttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,15 @@ public function __construct(
* @param string $path Request path, including leading slash.
* @param string|null $payload JSON request body, or null for no body.
* @param array<int, string> $extraHeaders Additional cURL header strings.
* @param array{shouldRetry?: bool, shouldVerify?: bool, includeEtag?: bool, timeout?: int} $requestOptions
* @param array{
* shouldRetry?: bool,
* shouldVerify?: bool,
* includeEtag?: bool,
* timeout?: int
* } $requestOptions
* @return HttpResponse
*/
// phpcs:ignore Generic.Files.LineLength.TooLong
public function sendRequest(string $path, ?string $payload, array $extraHeaders = [], array $requestOptions = []): HttpResponse
{
$protocol = $this->useSsl ? "https://" : "http://";
Expand Down Expand Up @@ -153,9 +159,10 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders
// Retry uploading in these cases.
usleep($backoff * 1000);
$backoff *= 2;
} elseif ($responseCode >= 400) {
break;
} elseif ($responseCode == 0) {
} else {
// Do not retry non-5xx/non-429 responses (e.g. 4xx, 413 Payload Too Large,
// or responseCode 0 for network errors). PHP sends synchronously in the hosting
// app's request path, so broad retries would slow down the host application.
break;
}
} else {
Expand Down
16 changes: 14 additions & 2 deletions lib/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ abstract class QueueConsumer extends Consumer
protected const MAX_BATCH_PAYLOAD_SIZE = 1024 * 1024; // 1MB
protected const MAX_BATCH_PAYLOAD_SIZE_HUMAN = (self::MAX_BATCH_PAYLOAD_SIZE / 1024) . 'KB';

// flushBatch() returns this when the batch may succeed later and should remain queued.
protected const FLUSH_BATCH_RETRYABLE_FAILURE = 'retryable_failure';

// flushBatch() returns this when retrying the same batch would block the queue.
protected const FLUSH_BATCH_NON_RETRYABLE_FAILURE = 'non_retryable_failure';

protected $type = "QueueConsumer";

protected $queue;
Expand Down Expand Up @@ -129,8 +135,14 @@ public function flush()
$success = true;

while ($count > 0 && $success) {
$batch = array_splice($this->queue, 0, min($this->batch_size, $count));
$success = $this->flushBatch($batch);
$batchSize = min($this->batch_size, $count);
$batch = array_slice($this->queue, 0, $batchSize);
$result = $this->flushBatch($batch);
$success = true === $result;

if ($success || self::FLUSH_BATCH_RETRYABLE_FAILURE !== $result) {
array_splice($this->queue, 0, $batchSize);
}

$count = count($this->queue);
}
Expand Down
31 changes: 27 additions & 4 deletions test/MockedHttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class MockedHttpClient extends \PostHog\HttpClient
/** @var int Curl error number for /flags/ endpoint (for error simulation) */
private $flagsEndpointCurlErrno;

private $batchEndpointResponse;
private $batchEndpointResponseCode;
private $batchEndpointCurlErrno;

public function __construct(
string $host,
bool $useSsl = true,
Expand All @@ -37,7 +41,10 @@ public function __construct(
?string $flagEndpointEtag = null,
int $flagEndpointResponseCode = 200,
int $flagsEndpointResponseCode = 200,
int $flagsEndpointCurlErrno = 0
int $flagsEndpointCurlErrno = 0,
$batchEndpointResponse = '{"status":1}',
int $batchEndpointResponseCode = 200,
int $batchEndpointCurlErrno = 0
) {
parent::__construct(
$host,
Expand All @@ -49,12 +56,17 @@ public function __construct(
$curlTimeoutMilliseconds
);
$this->flagEndpointResponse = $flagEndpointResponse;
$this->flagsEndpointResponse = !empty($flagsEndpointResponse) ? $flagsEndpointResponse : MockedResponses::FLAGS_REQUEST;
$this->flagsEndpointResponse = !empty($flagsEndpointResponse)
? $flagsEndpointResponse
: MockedResponses::FLAGS_REQUEST;
$this->flagEndpointEtag = $flagEndpointEtag;
$this->flagEndpointResponseCode = $flagEndpointResponseCode;
$this->flagEndpointResponseQueue = null;
$this->flagsEndpointResponseCode = $flagsEndpointResponseCode;
$this->flagsEndpointCurlErrno = $flagsEndpointCurlErrno;
$this->batchEndpointResponse = $batchEndpointResponse;
$this->batchEndpointResponseCode = $batchEndpointResponseCode;
$this->batchEndpointCurlErrno = $batchEndpointCurlErrno;
}

/**
Expand All @@ -68,12 +80,18 @@ public function setFlagEndpointResponseQueue(array $responses): void
$this->flagEndpointResponseQueue = $responses;
}

// phpcs:ignore Generic.Files.LineLength.TooLong
public function sendRequest(string $path, ?string $payload, array $extraHeaders = [], array $requestOptions = []): HttpResponse
{
if (!isset($this->calls)) {
$this->calls = [];
}
array_push($this->calls, array("path" => $path, "payload" => $payload, "extraHeaders" => $extraHeaders, "requestOptions" => $requestOptions));
array_push($this->calls, array(
"path" => $path,
"payload" => $payload,
"extraHeaders" => $extraHeaders,
"requestOptions" => $requestOptions,
));

// Local evaluation endpoint: /flags/definitions?...
if (str_starts_with($path, "/flags/definitions")) {
Expand Down Expand Up @@ -115,7 +133,12 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders
}

if ($path === "/batch/") {
return new HttpResponse('{"status":1}', 200);
return new HttpResponse(
$this->batchEndpointResponse,
$this->batchEndpointResponseCode,
null,
$this->batchEndpointCurlErrno
);
}

return parent::sendRequest($path, $payload, $extraHeaders, $requestOptions);
Expand Down
Loading
Loading