diff --git a/.changeset/quiet-batches-retry.md b/.changeset/quiet-batches-retry.md new file mode 100644 index 0000000..b839f87 --- /dev/null +++ b/.changeset/quiet-batches-retry.md @@ -0,0 +1,5 @@ +--- +"posthog-php": patch +--- + +Retain queued batches after network or timeout flush failures. diff --git a/lib/Consumer/ForkCurl.php b/lib/Consumer/ForkCurl.php index 826ccbd..4e91b91 100644 --- a/lib/Consumer/ForkCurl.php +++ b/lib/Consumer/ForkCurl.php @@ -38,7 +38,7 @@ public function getConsumer() * Make an async request to our API. Fork a curl process, immediately send * to the API. If debug is enabled, we wait for the response. * @param array> $messages Array of messages to send. - * @return bool Whether the request succeeded. + * @return bool|string Whether the request succeeded or a queue failure classification. */ public function flushBatch($messages) { @@ -64,7 +64,7 @@ public function flushBatch($messages) if (0 != $exit) { $this->handleError($exit, $output); - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } $cmd .= " -H 'Content-Encoding: gzip'"; @@ -82,7 +82,7 @@ public function flushBatch($messages) error_log("[PostHog][" . $this->type . "] " . $msg); } - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } // Send user agent in the form of {library_name}/{library_version} as per RFC 7231. @@ -102,6 +102,17 @@ public function flushBatch($messages) unlink($tmpfname); } - return 0 == $exit; + if (0 == $exit) { + return true; + } + + return $this->isNetworkCurlExit($exit) + ? self::FLUSH_BATCH_RETRYABLE_FAILURE + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } + + private function isNetworkCurlExit($exit) + { + return in_array((int) $exit, [6, 7, 28, 35, 52, 56], true); } } diff --git a/lib/Consumer/LibCurl.php b/lib/Consumer/LibCurl.php index c95520e..343fb33 100644 --- a/lib/Consumer/LibCurl.php +++ b/lib/Consumer/LibCurl.php @@ -52,7 +52,7 @@ public function getConsumer() * enabled, we wait for the response * and retry once to diminish impact on performance. * @param array> $messages Array of messages to send. - * @return bool Whether the request succeeded. + * @return bool|string Whether the request succeeded or a queue failure classification. */ public function flushBatch($messages) { @@ -65,11 +65,15 @@ public function flushBatch($messages) error_log("[PostHog][" . $this->type . "] " . $msg); } - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } if ($this->compress_request) { $payload = gzencode($payload); + + if (false === $payload) { + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } } $shouldVerify = $this->options['verify_batch_events_request'] ?? true; @@ -86,10 +90,27 @@ public function flushBatch($messages) ); if (!$shouldVerify) { - return $response->getResponse() !== false; + if ($response->getResponse() !== false) { + return true; + } + + return $this->isNetworkFailure($response) + ? self::FLUSH_BATCH_RETRYABLE_FAILURE + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } // Keep batch success semantics aligned with HttpClient retry handling and the Socket consumer. - return $response->getResponseCode() === 200; + if ($response->getResponseCode() === 200) { + return true; + } + + return $this->isNetworkFailure($response) + ? self::FLUSH_BATCH_RETRYABLE_FAILURE + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } + + private function isNetworkFailure($response) + { + return $response->getResponseCode() === 0 && $response->getCurlErrno() !== 0; } } diff --git a/lib/Consumer/Socket.php b/lib/Consumer/Socket.php index 2b40185..b4e18e8 100644 --- a/lib/Consumer/Socket.php +++ b/lib/Consumer/Socket.php @@ -43,14 +43,14 @@ public function getConsumer() * Send a batch of queued messages. * * @param array> $batch Batch of queued messages. - * @return bool Whether the request succeeded. + * @return bool|string Whether the request succeeded or a queue failure classification. */ public function flushBatch($batch) { $socket = $this->createSocket(); if (!$socket) { - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } $payload = $this->payload($batch); @@ -58,10 +58,12 @@ public function flushBatch($batch) $body = $this->createBody($this->host, $payload); if (false === $body) { - return false; + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } - return $this->makeRequest($socket, $body); + return $this->makeRequest($socket, $body) + ? true + : self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; } private function createSocket() @@ -198,6 +200,10 @@ private function createBody($host, $content) if ($this->compress_request) { $content = gzencode($content); + if (false === $content) { + return false; + } + $req .= "Content-Encoding: gzip\r\n"; } diff --git a/lib/HttpClient.php b/lib/HttpClient.php index 60c570d..bbf78fd 100644 --- a/lib/HttpClient.php +++ b/lib/HttpClient.php @@ -80,9 +80,15 @@ public function __construct( * @param string $path Request path, including leading slash. * @param string|null $payload JSON request body, or null for no body. * @param array $extraHeaders Additional cURL header strings. - * @param array{shouldRetry?: bool, shouldVerify?: bool, includeEtag?: bool, timeout?: int} $requestOptions + * @param array{ + * shouldRetry?: bool, + * shouldVerify?: bool, + * includeEtag?: bool, + * timeout?: int + * } $requestOptions * @return HttpResponse */ + // phpcs:ignore Generic.Files.LineLength.TooLong public function sendRequest(string $path, ?string $payload, array $extraHeaders = [], array $requestOptions = []): HttpResponse { $protocol = $this->useSsl ? "https://" : "http://"; @@ -153,9 +159,10 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders // Retry uploading in these cases. usleep($backoff * 1000); $backoff *= 2; - } elseif ($responseCode >= 400) { - break; - } elseif ($responseCode == 0) { + } else { + // Do not retry non-5xx/non-429 responses (e.g. 4xx, 413 Payload Too Large, + // or responseCode 0 for network errors). PHP sends synchronously in the hosting + // app's request path, so broad retries would slow down the host application. break; } } else { diff --git a/lib/QueueConsumer.php b/lib/QueueConsumer.php index c072bbc..9f522f8 100644 --- a/lib/QueueConsumer.php +++ b/lib/QueueConsumer.php @@ -18,6 +18,12 @@ abstract class QueueConsumer extends Consumer protected const MAX_BATCH_PAYLOAD_SIZE = 1024 * 1024; // 1MB protected const MAX_BATCH_PAYLOAD_SIZE_HUMAN = (self::MAX_BATCH_PAYLOAD_SIZE / 1024) . 'KB'; + // flushBatch() returns this when the batch may succeed later and should remain queued. + protected const FLUSH_BATCH_RETRYABLE_FAILURE = 'retryable_failure'; + + // flushBatch() returns this when retrying the same batch would block the queue. + protected const FLUSH_BATCH_NON_RETRYABLE_FAILURE = 'non_retryable_failure'; + protected $type = "QueueConsumer"; protected $queue; @@ -129,8 +135,14 @@ public function flush() $success = true; while ($count > 0 && $success) { - $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); - $success = $this->flushBatch($batch); + $batchSize = min($this->batch_size, $count); + $batch = array_slice($this->queue, 0, $batchSize); + $result = $this->flushBatch($batch); + $success = true === $result; + + if ($success || self::FLUSH_BATCH_RETRYABLE_FAILURE !== $result) { + array_splice($this->queue, 0, $batchSize); + } $count = count($this->queue); } diff --git a/test/MockedHttpClient.php b/test/MockedHttpClient.php index 0a4dd6c..ccce706 100644 --- a/test/MockedHttpClient.php +++ b/test/MockedHttpClient.php @@ -24,6 +24,10 @@ class MockedHttpClient extends \PostHog\HttpClient /** @var int Curl error number for /flags/ endpoint (for error simulation) */ private $flagsEndpointCurlErrno; + private $batchEndpointResponse; + private $batchEndpointResponseCode; + private $batchEndpointCurlErrno; + public function __construct( string $host, bool $useSsl = true, @@ -37,7 +41,10 @@ public function __construct( ?string $flagEndpointEtag = null, int $flagEndpointResponseCode = 200, int $flagsEndpointResponseCode = 200, - int $flagsEndpointCurlErrno = 0 + int $flagsEndpointCurlErrno = 0, + $batchEndpointResponse = '{"status":1}', + int $batchEndpointResponseCode = 200, + int $batchEndpointCurlErrno = 0 ) { parent::__construct( $host, @@ -49,12 +56,17 @@ public function __construct( $curlTimeoutMilliseconds ); $this->flagEndpointResponse = $flagEndpointResponse; - $this->flagsEndpointResponse = !empty($flagsEndpointResponse) ? $flagsEndpointResponse : MockedResponses::FLAGS_REQUEST; + $this->flagsEndpointResponse = !empty($flagsEndpointResponse) + ? $flagsEndpointResponse + : MockedResponses::FLAGS_REQUEST; $this->flagEndpointEtag = $flagEndpointEtag; $this->flagEndpointResponseCode = $flagEndpointResponseCode; $this->flagEndpointResponseQueue = null; $this->flagsEndpointResponseCode = $flagsEndpointResponseCode; $this->flagsEndpointCurlErrno = $flagsEndpointCurlErrno; + $this->batchEndpointResponse = $batchEndpointResponse; + $this->batchEndpointResponseCode = $batchEndpointResponseCode; + $this->batchEndpointCurlErrno = $batchEndpointCurlErrno; } /** @@ -68,12 +80,18 @@ public function setFlagEndpointResponseQueue(array $responses): void $this->flagEndpointResponseQueue = $responses; } + // phpcs:ignore Generic.Files.LineLength.TooLong public function sendRequest(string $path, ?string $payload, array $extraHeaders = [], array $requestOptions = []): HttpResponse { if (!isset($this->calls)) { $this->calls = []; } - array_push($this->calls, array("path" => $path, "payload" => $payload, "extraHeaders" => $extraHeaders, "requestOptions" => $requestOptions)); + array_push($this->calls, array( + "path" => $path, + "payload" => $payload, + "extraHeaders" => $extraHeaders, + "requestOptions" => $requestOptions, + )); // Local evaluation endpoint: /flags/definitions?... if (str_starts_with($path, "/flags/definitions")) { @@ -115,7 +133,12 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders } if ($path === "/batch/") { - return new HttpResponse('{"status":1}', 200); + return new HttpResponse( + $this->batchEndpointResponse, + $this->batchEndpointResponseCode, + null, + $this->batchEndpointCurlErrno + ); } return parent::sendRequest($path, $payload, $extraHeaders, $requestOptions); diff --git a/test/QueueConsumerTest.php b/test/QueueConsumerTest.php new file mode 100644 index 0000000..0411813 --- /dev/null +++ b/test/QueueConsumerTest.php @@ -0,0 +1,140 @@ +message('first'); + $second = $this->message('second'); + $consumer = new QueueConsumerTestConsumer( + [QueueConsumerTestConsumer::retryableFailure()], + ['batch_size' => 2] + ); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + + $this->assertSame([[$first, $second]], $consumer->flushedBatches); + $this->assertSame([$first, $second], $consumer->queuedItems()); + } + + public function testNonRetryableFlushFailureDropsBatch(): void + { + $first = $this->message('first'); + $second = $this->message('second'); + $consumer = new QueueConsumerTestConsumer( + [QueueConsumerTestConsumer::nonRetryableFailure()], + ['batch_size' => 2] + ); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + + $this->assertSame([[$first, $second]], $consumer->flushedBatches); + $this->assertSame([], $consumer->queuedItems()); + } + + public function testFalseFlushBatchResultStillDropsBatchForCompatibility(): void + { + $first = $this->message('first'); + $second = $this->message('second'); + $consumer = new QueueConsumerTestConsumer([false], ['batch_size' => 2]); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + + $this->assertSame([], $consumer->queuedItems()); + } + + public function testSocketConnectionFailureDropsBatch(): void + { + $message = $this->message('socket-connection-failure'); + $consumer = new Socket( + 'test-key', + [ + 'batch_size' => 1, + 'host' => 'invalid.invalid', + 'ssl' => false, + 'timeout' => 0.01, + ] + ); + + $this->assertFalse($consumer->capture($message)); + + $this->assertSame([], $this->queuedItems($consumer)); + } + + public function testRetainedFailedBatchIsRetriedBeforeNewerEvents(): void + { + $first = $this->message('first'); + $second = $this->message('second'); + $third = $this->message('third'); + $consumer = new QueueConsumerTestConsumer( + [QueueConsumerTestConsumer::retryableFailure(), true], + ['batch_size' => 2] + ); + + $this->assertTrue($consumer->enqueue($first)); + $this->assertFalse($consumer->enqueue($second)); + $this->assertTrue($consumer->enqueue($third)); + + $this->assertSame([[$first, $second], [$first, $second], [$third]], $consumer->flushedBatches); + $this->assertSame([], $consumer->queuedItems()); + } + + #[DataProvider('libCurlFailureQueueBehaviorCases')] + public function testLibCurlFailureQueueBehavior( + string $event, + $batchEndpointResponse, + int $batchEndpointResponseCode, + int $batchEndpointCurlErrno, + bool $shouldKeepQueued + ): void { + $message = $this->message($event); + $httpClient = new MockedHttpClient( + 'app.posthog.com', + batchEndpointResponse: $batchEndpointResponse, + batchEndpointResponseCode: $batchEndpointResponseCode, + batchEndpointCurlErrno: $batchEndpointCurlErrno + ); + $consumer = new LibCurl('test-key', ['batch_size' => 1], $httpClient); + + $this->assertFalse($consumer->capture($message)); + + $this->assertSame($shouldKeepQueued ? [$message] : [], $this->queuedItems($consumer)); + } + + public static function libCurlFailureQueueBehaviorCases(): array + { + return [ + 'network failure keeps batch queued' => ['network-failure', false, 0, 28, true], + 'http failure drops batch' => ['http-failure', '{"status":0}', 500, 0, false], + 'payload too large drops batch' => ['payload-too-large', '{"status":0}', 413, 0, false], + ]; + } + + private function message(string $event): array + { + return [ + 'event' => $event, + 'library' => 'posthog-php', + 'library_version' => 'test', + ]; + } + + private function queuedItems($consumer): array + { + $reflection = new \ReflectionClass(QueueConsumer::class); + $queueProperty = $reflection->getProperty('queue'); + + return $queueProperty->getValue($consumer); + } +} diff --git a/test/QueueConsumerTestConsumer.php b/test/QueueConsumerTestConsumer.php new file mode 100644 index 0000000..d40ea14 --- /dev/null +++ b/test/QueueConsumerTestConsumer.php @@ -0,0 +1,49 @@ +flushResults = $flushResults; + parent::__construct('test-key', $options); + } + + public function __destruct() + { + // Avoid destructor-triggered flushes changing assertions in tests. + } + + public static function retryableFailure() + { + return self::FLUSH_BATCH_RETRYABLE_FAILURE; + } + + public static function nonRetryableFailure() + { + return self::FLUSH_BATCH_NON_RETRYABLE_FAILURE; + } + + public function queuedItems(): array + { + return $this->queue; + } + + public function flushBatch($batch) + { + $this->flushedBatches[] = $batch; + + if ([] === $this->flushResults) { + return true; + } + + return array_shift($this->flushResults); + } +}