From f0fd5467786fd11c329a978e9326edb3ebe4ff79 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 17:42:57 +0200 Subject: [PATCH 1/3] fix: retain events after network flush failures --- .changeset/quiet-batches-retry.md | 5 ++ lib/Consumer/ForkCurl.php | 19 +++- lib/Consumer/LibCurl.php | 29 ++++++- lib/Consumer/Socket.php | 14 ++- lib/HttpClient.php | 13 ++- lib/QueueConsumer.php | 16 +++- test/MockedHttpClient.php | 31 ++++++- test/QueueConsumerTest.php | 135 +++++++++++++++++++++++++++++ test/QueueConsumerTestConsumer.php | 49 +++++++++++ 9 files changed, 292 insertions(+), 19 deletions(-) create mode 100644 .changeset/quiet-batches-retry.md create mode 100644 test/QueueConsumerTest.php create mode 100644 test/QueueConsumerTestConsumer.php diff --git a/.changeset/quiet-batches-retry.md b/.changeset/quiet-batches-retry.md new file mode 100644 index 0000000..b839f87 --- /dev/null +++ b/.changeset/quiet-batches-retry.md @@ -0,0 +1,5 @@ +--- +"posthog-php": patch +--- + +Retain queued batches after network or timeout flush failures. diff --git a/lib/Consumer/ForkCurl.php b/lib/Consumer/ForkCurl.php index 23fb965..9d83aa2 100644 --- a/lib/Consumer/ForkCurl.php +++ b/lib/Consumer/ForkCurl.php @@ -38,7 +38,7 @@ public function getConsumer() * Make an async request to our API. Fork a curl process, immediately send * to the API. If debug is enabled, we wait for the response. * @param array> $messages Array of messages to send. - * @return bool Whether the request succeeded. + * @return bool|string Whether the request succeeded or a queue failure classification. */ public function flushBatch($messages) { @@ -64,7 +64,7 @@ public function flushBatch($messages) if (0 != $exit) { $this->handleError($exit, $output); - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } $cmd .= " -H 'Content-Encoding: gzip'"; @@ -82,7 +82,7 @@ public function flushBatch($messages) error_log("[PostHog][" . $this->type . "] " . $msg); } - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } // Send user agent in the form of {library_name}/{library_version} as per RFC 7231. @@ -104,6 +104,17 @@ public function flushBatch($messages) unlink($tmpfname); } - return 0 == $exit; + if (0 == $exit) { + return true; + } + + return $this->isNetworkCurlExit($exit) + ? self::FLUSH_BATCH_RETRYABLE_FAILURE + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } + + private function isNetworkCurlExit($exit) + { + return in_array((int) $exit, [6, 7, 28, 35, 52, 56], true); } } diff --git a/lib/Consumer/LibCurl.php b/lib/Consumer/LibCurl.php index f07e22e..2e47212 100644 --- a/lib/Consumer/LibCurl.php +++ b/lib/Consumer/LibCurl.php @@ -52,7 +52,7 @@ public function getConsumer() * enabled, we wait for the response * and retry once to diminish impact on performance. * @param array> $messages Array of messages to send. - * @return bool Whether the request succeeded. + * @return bool|string Whether the request succeeded or a queue failure classification. */ public function flushBatch($messages) { @@ -65,11 +65,15 @@ public function flushBatch($messages) error_log("[PostHog][" . $this->type . "] " . $msg); } - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } if ($this->compress_request) { $payload = gzencode($payload); + + if (false === $payload) { + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } } $shouldVerify = $this->options['verify_batch_events_request'] ?? true; @@ -86,10 +90,27 @@ public function flushBatch($messages) ); if (!$shouldVerify) { - return $response->getResponse() !== false; + if ($response->getResponse() !== false) { + return true; + } + + return $this->isNetworkFailure($response) + ? self::FLUSH_BATCH_RETRYABLE_FAILURE + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } // Keep batch success semantics aligned with HttpClient retry handling and the Socket consumer. - return $response->getResponseCode() === 200; + if ($response->getResponseCode() === 200) { + return true; + } + + return $this->isNetworkFailure($response) + ? self::FLUSH_BATCH_RETRYABLE_FAILURE + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } + + private function isNetworkFailure($response) + { + return $response->getResponseCode() === 0 && $response->getCurlErrno() !== 0; } } diff --git a/lib/Consumer/Socket.php b/lib/Consumer/Socket.php index c2845ae..a6f0a2c 100644 --- a/lib/Consumer/Socket.php +++ b/lib/Consumer/Socket.php @@ -43,14 +43,14 @@ public function getConsumer() * Send a batch of queued messages. * * @param array> $batch Batch of queued messages. - * @return bool Whether the request succeeded. + * @return bool|string Whether the request succeeded or a queue failure classification. */ public function flushBatch($batch) { $socket = $this->createSocket(); if (!$socket) { - return false; + return self::FLUSH_BATCH_RETRYABLE_FAILURE; } $payload = $this->payload($batch); @@ -58,10 +58,12 @@ public function flushBatch($batch) $body = $this->createBody($this->host, $payload); if (false === $body) { - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } - return $this->makeRequest($socket, $body); + return $this->makeRequest($socket, $body) + ? true + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } private function createSocket() @@ -201,6 +203,10 @@ private function createBody($host, $content) if ($this->compress_request) { $content = gzencode($content); + if (false === $content) { + return false; + } + $req .= "Content-Encoding: gzip\r\n"; } diff --git a/lib/HttpClient.php b/lib/HttpClient.php index 60c570d..2190314 100644 --- a/lib/HttpClient.php +++ b/lib/HttpClient.php @@ -80,9 +80,15 @@ public function __construct( * @param string $path Request path, including leading slash. * @param string|null $payload JSON request body, or null for no body. * @param array $extraHeaders Additional cURL header strings. - * @param array{shouldRetry?: bool, shouldVerify?: bool, includeEtag?: bool, timeout?: int} $requestOptions + * @param array{ + * shouldRetry?: bool, + * shouldVerify?: bool, + * includeEtag?: bool, + * timeout?: int + * } $requestOptions * @return HttpResponse */ + // phpcs:ignore Generic.Files.LineLength.TooLong public function sendRequest(string $path, ?string $payload, array $extraHeaders = [], array $requestOptions = []): HttpResponse { $protocol = $this->useSsl ? "https://" : "http://"; @@ -154,9 +160,14 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders usleep($backoff * 1000); $backoff *= 2; } elseif ($responseCode >= 400) { + // Do not retry every non-2xx/3xx response (notably 413 Payload Too Large). + // PHP sends synchronously in the hosting app's request path, so broad retries + // would slow down the host application. break; } elseif ($responseCode == 0) { break; + } else { + break; } } else { break; // no error diff --git a/lib/QueueConsumer.php b/lib/QueueConsumer.php index 9c5bc42..f330b2a 100644 --- a/lib/QueueConsumer.php +++ b/lib/QueueConsumer.php @@ -12,6 +12,12 @@ abstract class QueueConsumer extends Consumer protected const MAX_BATCH_PAYLOAD_SIZE = 1024 * 1024; // 1MB protected const MAX_BATCH_PAYLOAD_SIZE_HUMAN = (self::MAX_BATCH_PAYLOAD_SIZE / 1024) . 'KB'; + // flushBatch() returns this when the batch may succeed later and should remain queued. + protected const FLUSH_BATCH_RETRYABLE_FAILURE = 'retryable_failure'; + + // flushBatch() returns this when retrying the same batch would block the queue. + protected const FLUSH_BATCH_NON_RETRYABLE_FAILURE = 'non_retryable_failure'; + protected $type = "QueueConsumer"; protected $queue; @@ -111,8 +117,14 @@ public function flush() $success = true; while ($count > 0 && $success) { - $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); - $success = $this->flushBatch($batch); + $batchSize = min($this->batch_size, $count); + $batch = array_slice($this->queue, 0, $batchSize); + $result = $this->flushBatch($batch); + $success = true === $result; + + if ($success || self::FLUSH_BATCH_RETRYABLE_FAILURE !== $result) { + array_splice($this->queue, 0, $batchSize); + } $count = count($this->queue); } diff --git a/test/MockedHttpClient.php b/test/MockedHttpClient.php index 0a4dd6c..ccce706 100644 --- a/test/MockedHttpClient.php +++ b/test/MockedHttpClient.php @@ -24,6 +24,10 @@ class MockedHttpClient extends \PostHog\HttpClient /** @var int Curl error number for /flags/ endpoint (for error simulation) */ private $flagsEndpointCurlErrno; + private $batchEndpointResponse; + private $batchEndpointResponseCode; + private $batchEndpointCurlErrno; + public function __construct( string $host, bool $useSsl = true, @@ -37,7 +41,10 @@ public function __construct( ?string $flagEndpointEtag = null, int $flagEndpointResponseCode = 200, int $flagsEndpointResponseCode = 200, - int $flagsEndpointCurlErrno = 0 + int $flagsEndpointCurlErrno = 0, + $batchEndpointResponse = '{"status":1}', + int $batchEndpointResponseCode = 200, + int $batchEndpointCurlErrno = 0 ) { parent::__construct( $host, @@ -49,12 +56,17 @@ public function __construct( $curlTimeoutMilliseconds ); $this->flagEndpointResponse = $flagEndpointResponse; - $this->flagsEndpointResponse = !empty($flagsEndpointResponse) ? $flagsEndpointResponse : MockedResponses::FLAGS_REQUEST; + $this->flagsEndpointResponse = !empty($flagsEndpointResponse) + ? $flagsEndpointResponse + : MockedResponses::FLAGS_REQUEST; $this->flagEndpointEtag = $flagEndpointEtag; $this->flagEndpointResponseCode = $flagEndpointResponseCode; $this->flagEndpointResponseQueue = null; $this->flagsEndpointResponseCode = $flagsEndpointResponseCode; $this->flagsEndpointCurlErrno = $flagsEndpointCurlErrno; + $this->batchEndpointResponse = $batchEndpointResponse; + $this->batchEndpointResponseCode = $batchEndpointResponseCode; + $this->batchEndpointCurlErrno = $batchEndpointCurlErrno; } /** @@ -68,12 +80,18 @@ public function setFlagEndpointResponseQueue(array $responses): void $this->flagEndpointResponseQueue = $responses; } + // phpcs:ignore Generic.Files.LineLength.TooLong public function sendRequest(string $path, ?string $payload, array $extraHeaders = [], array $requestOptions = []): HttpResponse { if (!isset($this->calls)) { $this->calls = []; } - array_push($this->calls, array("path" => $path, "payload" => $payload, "extraHeaders" => $extraHeaders, "requestOptions" => $requestOptions)); + array_push($this->calls, array( + "path" => $path, + "payload" => $payload, + "extraHeaders" => $extraHeaders, + "requestOptions" => $requestOptions, + )); // Local evaluation endpoint: /flags/definitions?... if (str_starts_with($path, "/flags/definitions")) { @@ -115,7 +133,12 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders } if ($path === "/batch/") { - return new HttpResponse('{"status":1}', 200); + return new HttpResponse( + $this->batchEndpointResponse, + $this->batchEndpointResponseCode, + null, + $this->batchEndpointCurlErrno + ); } return parent::sendRequest($path, $payload, $extraHeaders, $requestOptions); diff --git a/test/QueueConsumerTest.php b/test/QueueConsumerTest.php new file mode 100644 index 0000000..82be0f2 --- /dev/null +++ b/test/QueueConsumerTest.php @@ -0,0 +1,135 @@ +message('first'); + $second = $this->message('second'); + $consumer = new QueueConsumerTestConsumer( + [QueueConsumerTestConsumer::retryableFailure()], + ['batch_size' => 2] + ); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + + $this->assertSame([[$first, $second]], $consumer->flushedBatches); + $this->assertSame([$first, $second], $consumer->queuedItems()); + } + + public function testNonRetryableFlushFailureDropsBatch(): void + { + $first = $this->message('first'); + $second = $this->message('second'); + $consumer = new QueueConsumerTestConsumer( + [QueueConsumerTestConsumer::nonRetryableFailure()], + ['batch_size' => 2] + ); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + + $this->assertSame([[$first, $second]], $consumer->flushedBatches); + $this->assertSame([], $consumer->queuedItems()); + } + + public function testFalseFlushBatchResultStillDropsBatchForCompatibility(): void + { + $first = $this->message('first'); + $second = $this->message('second'); + $consumer = new QueueConsumerTestConsumer([false], ['batch_size' => 2]); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + + $this->assertSame([], $consumer->queuedItems()); + } + + public function testRetainedFailedBatchIsRetriedBeforeNewerEvents(): void + { + $first = $this->message('first'); + $second = $this->message('second'); + $third = $this->message('third'); + $consumer = new QueueConsumerTestConsumer( + [QueueConsumerTestConsumer::retryableFailure(), true], + ['batch_size' => 2] + ); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + $this->assertTrue($consumer->enqueue($third)); + + $this->assertSame([[$first, $second], [$first, $second], [$third]], $consumer->flushedBatches); + $this->assertSame([], $consumer->queuedItems()); + } + + public function testLibCurlNetworkFailureKeepsBatchQueued(): void + { + $message = $this->message('network-failure'); + $httpClient = new MockedHttpClient( + 'app.posthog.com', + batchEndpointResponse: false, + batchEndpointResponseCode: 0, + batchEndpointCurlErrno: 28 + ); + $consumer = new LibCurl('test-key', ['batch_size' => 1], $httpClient); + + $this->assertFalse($consumer->capture($message)); + + $this->assertSame([$message], $this->queuedItems($consumer)); + } + + public function testLibCurlHttpFailureDropsBatch(): void + { + $message = $this->message('http-failure'); + $httpClient = new MockedHttpClient( + 'app.posthog.com', + batchEndpointResponse: '{"status":0}', + batchEndpointResponseCode: 500 + ); + $consumer = new LibCurl('test-key', ['batch_size' => 1], $httpClient); + + $this->assertFalse($consumer->capture($message)); + + $this->assertSame([], $this->queuedItems($consumer)); + } + + public function testLibCurlPayloadTooLargeDropsBatch(): void + { + $message = $this->message('payload-too-large'); + $httpClient = new MockedHttpClient( + 'app.posthog.com', + batchEndpointResponse: '{"status":0}', + batchEndpointResponseCode: 413 + ); + $consumer = new LibCurl('test-key', ['batch_size' => 1], $httpClient); + + $this->assertFalse($consumer->capture($message)); + + $this->assertSame([], $this->queuedItems($consumer)); + } + + private function message(string $event): array + { + return [ + 'event' => $event, + 'library' => 'posthog-php', + 'library_version' => 'test', + ]; + } + + private function queuedItems($consumer): array + { + $reflection = new \ReflectionClass(QueueConsumer::class); + $queueProperty = $reflection->getProperty('queue'); + + return $queueProperty->getValue($consumer); + } +} diff --git a/test/QueueConsumerTestConsumer.php b/test/QueueConsumerTestConsumer.php new file mode 100644 index 0000000..d40ea14 --- /dev/null +++ b/test/QueueConsumerTestConsumer.php @@ -0,0 +1,49 @@ +flushResults = $flushResults; + parent::__construct('test-key', $options); + } + + public function __destruct() + { + // Avoid destructor-triggered flushes changing assertions in tests. + } + + public static function retryableFailure() + { + return self::FLUSH_BATCH_RETRYABLE_FAILURE; + } + + public static function nonRetryableFailure() + { + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } + + public function queuedItems(): array + { + return $this->queue; + } + + public function flushBatch($batch) + { + $this->flushedBatches[] = $batch; + + if ([] === $this->flushResults) { + return true; + } + + return array_shift($this->flushResults); + } +} From 26a1c0c50a367acc8b8d4a1e055dbcd778de02b1 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Wed, 17 Jun 2026 18:23:55 +0200 Subject: [PATCH 2/3] address pr review feedback --- lib/HttpClient.php | 10 +++---- test/QueueConsumerTest.php | 54 ++++++++++++++------------------------ 2 files changed, 23 insertions(+), 41 deletions(-) diff --git a/lib/HttpClient.php b/lib/HttpClient.php index 2190314..bbf78fd 100644 --- a/lib/HttpClient.php +++ b/lib/HttpClient.php @@ -159,14 +159,10 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders // Retry uploading in these cases. usleep($backoff * 1000); $backoff *= 2; - } elseif ($responseCode >= 400) { - // Do not retry every non-2xx/3xx response (notably 413 Payload Too Large). - // PHP sends synchronously in the hosting app's request path, so broad retries - // would slow down the host application. - break; - } elseif ($responseCode == 0) { - break; } else { + // Do not retry non-5xx/non-429 responses (e.g. 4xx, 413 Payload Too Large, + // or responseCode 0 for network errors). PHP sends synchronously in the hosting + // app's request path, so broad retries would slow down the host application. break; } } else { diff --git a/test/QueueConsumerTest.php b/test/QueueConsumerTest.php index 82be0f2..c16413d 100644 --- a/test/QueueConsumerTest.php +++ b/test/QueueConsumerTest.php @@ -2,6 +2,7 @@ namespace PostHog\Test; +use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\TestCase; use PostHog\Consumer\LibCurl; use PostHog\QueueConsumer; @@ -70,50 +71,35 @@ public function testRetainedFailedBatchIsRetriedBeforeNewerEvents(): void $this->assertSame([], $consumer->queuedItems()); } - public function testLibCurlNetworkFailureKeepsBatchQueued(): void - { - $message = $this->message('network-failure'); - $httpClient = new MockedHttpClient( - 'app.posthog.com', - batchEndpointResponse: false, - batchEndpointResponseCode: 0, - batchEndpointCurlErrno: 28 - ); - $consumer = new LibCurl('test-key', ['batch_size' => 1], $httpClient); - - $this->assertFalse($consumer->capture($message)); - - $this->assertSame([$message], $this->queuedItems($consumer)); - } - - public function testLibCurlHttpFailureDropsBatch(): void - { - $message = $this->message('http-failure'); + #[DataProvider('libCurlFailureQueueBehaviorCases')] + public function testLibCurlFailureQueueBehavior( + string $event, + $batchEndpointResponse, + int $batchEndpointResponseCode, + int $batchEndpointCurlErrno, + bool $shouldKeepQueued + ): void { + $message = $this->message($event); $httpClient = new MockedHttpClient( 'app.posthog.com', - batchEndpointResponse: '{"status":0}', - batchEndpointResponseCode: 500 + batchEndpointResponse: $batchEndpointResponse, + batchEndpointResponseCode: $batchEndpointResponseCode, + batchEndpointCurlErrno: $batchEndpointCurlErrno ); $consumer = new LibCurl('test-key', ['batch_size' => 1], $httpClient); $this->assertFalse($consumer->capture($message)); - $this->assertSame([], $this->queuedItems($consumer)); + $this->assertSame($shouldKeepQueued ? [$message] : [], $this->queuedItems($consumer)); } - public function testLibCurlPayloadTooLargeDropsBatch(): void + public static function libCurlFailureQueueBehaviorCases(): array { - $message = $this->message('payload-too-large'); - $httpClient = new MockedHttpClient( - 'app.posthog.com', - batchEndpointResponse: '{"status":0}', - batchEndpointResponseCode: 413 - ); - $consumer = new LibCurl('test-key', ['batch_size' => 1], $httpClient); - - $this->assertFalse($consumer->capture($message)); - - $this->assertSame([], $this->queuedItems($consumer)); + return [ + 'network failure keeps batch queued' => ['network-failure', false, 0, 28, true], + 'http failure drops batch' => ['http-failure', '{"status":0}', 500, 0, false], + 'payload too large drops batch' => ['payload-too-large', '{"status":0}', 413, 0, false], + ]; } private function message(string $event): array From 606324a217858b11fc90739c494930442cbaa695 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Thu, 18 Jun 2026 20:30:42 +0200 Subject: [PATCH 3/3] address socket flush feedback --- lib/Consumer/Socket.php | 2 +- test/QueueConsumerTest.php | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/lib/Consumer/Socket.php b/lib/Consumer/Socket.php index b958250..b4e18e8 100644 --- a/lib/Consumer/Socket.php +++ b/lib/Consumer/Socket.php @@ -50,7 +50,7 @@ public function flushBatch($batch) $socket = $this->createSocket(); if (!$socket) { - return self::FLUSH_BATCH_RETRYABLE_FAILURE; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } $payload = $this->payload($batch); diff --git a/test/QueueConsumerTest.php b/test/QueueConsumerTest.php index c16413d..0411813 100644 --- a/test/QueueConsumerTest.php +++ b/test/QueueConsumerTest.php @@ -5,6 +5,7 @@ use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\TestCase; use PostHog\Consumer\LibCurl; +use PostHog\Consumer\Socket; use PostHog\QueueConsumer; class QueueConsumerTest extends TestCase @@ -53,6 +54,24 @@ public function testFalseFlushBatchResultStillDropsBatchForCompatibility(): void $this->assertSame([], $consumer->queuedItems()); } + public function testSocketConnectionFailureDropsBatch(): void + { + $message = $this->message('socket-connection-failure'); + $consumer = new Socket( + 'test-key', + [ + 'batch_size' => 1, + 'host' => 'invalid.invalid', + 'ssl' => false, + 'timeout' => 0.01, + ] + ); + + $this->assertFalse($consumer->capture($message)); + + $this->assertSame([], $this->queuedItems($consumer)); + } + public function testRetainedFailedBatchIsRetriedBeforeNewerEvents(): void { $first = $this->message('first');