From 464e5ad01a4782a192af7714032993a04b5e0d01 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 09:01:50 +0300 Subject: [PATCH 01/12] feat(async): add fiber future select methods --- composer.json | 1 + src/Client/ClickHouseAsyncClient.php | 29 ++++++++++++++ src/Client/PsrClickHouseAsyncClient.php | 51 +++++++++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/composer.json b/composer.json index e8cd916..e0a4a84 100644 --- a/composer.json +++ b/composer.json @@ -29,6 +29,7 @@ }, "require": { "php": "^8.4", + "amphp/amp": "^3.1", "guzzlehttp/promises": "^2.0", "guzzlehttp/psr7": "^2.6", "php-http/client-common": "^2.0", diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index b40590b..9824dba 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\Future; use GuzzleHttp\Promise\PromiseInterface; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; @@ -23,6 +24,19 @@ public function select( SettingsProvider $settings = new EmptySettingsProvider(), ): PromiseInterface; + /** + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectFuture( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; + /** * @param array $params * @param Format $outputFormat @@ -35,4 +49,19 @@ public function selectWithParams( Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), ): PromiseInterface; + + /** + * @param array $params + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectWithParamsFuture( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; } diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index de6b51c..d4c13b0 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,6 +4,8 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\DeferredFuture; +use Amp\Future; use Exception; use GuzzleHttp\Promise\Create; use GuzzleHttp\Promise\PromiseInterface; @@ -21,6 +23,7 @@ use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; +use Throwable; use function uniqid; @@ -50,6 +53,19 @@ public function select( return $this->selectWithParams($query, [], $outputFormat, $settings); } + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectFuture( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return $this->selectWithParamsFuture($query, [], $outputFormat, $settings); + } + /** * {@inheritDoc} * @@ -78,6 +94,41 @@ public function selectWithParams( ); } + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectWithParamsFuture( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return self::futureForPromise($this->selectWithParams($query, $params, $outputFormat, $settings)); + } + + /** + * @param PromiseInterface $promise + * + * @return Future + * + * @template T + */ + private static function futureForPromise(PromiseInterface $promise): Future + { + $deferred = new DeferredFuture(); + + $promise->then( + static fn (mixed $value) => $deferred->complete($value), + static fn (mixed $reason) => $deferred->error( + $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), + ), + ); + + return $deferred->getFuture(); + } + /** * @param array $params * @param (callable(ResponseInterface):mixed)|null $processResponse From 0aaef923106a4433d513c30468757f88f596d400 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 10:37:21 +0300 Subject: [PATCH 02/12] refactor(async): return futures from async client --- composer.json | 1 - src/Client/ClickHouseAsyncClient.php | 29 +-------- src/Client/PsrClickHouseAsyncClient.php | 87 ++++++++----------------- tests/Client/SelectAsyncTest.php | 9 +-- 4 files changed, 33 insertions(+), 93 deletions(-) diff --git a/composer.json b/composer.json index e0a4a84..f925bc5 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,6 @@ "require": { "php": "^8.4", "amphp/amp": "^3.1", - "guzzlehttp/promises": "^2.0", "guzzlehttp/psr7": "^2.6", "php-http/client-common": "^2.0", "psr/http-client": "^1.0", diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index 9824dba..d61c00c 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -5,7 +5,6 @@ namespace SimPod\ClickHouseClient\Client; use Amp\Future; -use GuzzleHttp\Promise\PromiseInterface; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; @@ -13,17 +12,6 @@ interface ClickHouseAsyncClient { - /** - * @param Format $outputFormat - * - * @template O of Output - */ - public function select( - string $query, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; - /** * @param Format $outputFormat * @@ -31,25 +19,12 @@ public function select( * * @template O of Output */ - public function selectFuture( + public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), ): Future; - /** - * @param array $params - * @param Format $outputFormat - * - * @template O of Output - */ - public function selectWithParams( - string $query, - array $params, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; - /** * @param array $params * @param Format $outputFormat @@ -58,7 +33,7 @@ public function selectWithParams( * * @template O of Output */ - public function selectWithParamsFuture( + public function selectWithParams( string $query, array $params, Format $outputFormat, diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index d4c13b0..d89141b 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -49,21 +49,8 @@ public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { - return $this->selectWithParams($query, [], $outputFormat, $settings); - } - - /** - * {@inheritDoc} - * - * @throws Exception - */ - public function selectFuture( - string $query, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), ): Future { - return $this->selectWithParamsFuture($query, [], $outputFormat, $settings); + return $this->selectWithParams($query, [], $outputFormat, $settings); } /** @@ -76,7 +63,7 @@ public function selectWithParams( array $params, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { + ): Future { $formatClause = $outputFormat::toSql(); $sql = $this->sqlFactory->createWithParameters($query, $params); @@ -94,41 +81,6 @@ public function selectWithParams( ); } - /** - * {@inheritDoc} - * - * @throws Exception - */ - public function selectWithParamsFuture( - string $query, - array $params, - Format $outputFormat, - SettingsProvider $settings = new EmptySettingsProvider(), - ): Future { - return self::futureForPromise($this->selectWithParams($query, $params, $outputFormat, $settings)); - } - - /** - * @param PromiseInterface $promise - * - * @return Future - * - * @template T - */ - private static function futureForPromise(PromiseInterface $promise): Future - { - $deferred = new DeferredFuture(); - - $promise->then( - static fn (mixed $value) => $deferred->complete($value), - static fn (mixed $reason) => $deferred->error( - $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), - ), - ); - - return $deferred->getFuture(); - } - /** * @param array $params * @param (callable(ResponseInterface):mixed)|null $processResponse @@ -140,7 +92,7 @@ private function executeRequest( array $params, SettingsProvider $settings, callable|null $processResponse, - ): PromiseInterface { + ): Future { $request = $this->requestFactory->prepareSqlRequest( $sql, new RequestSettings( @@ -155,11 +107,11 @@ private function executeRequest( $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); - return Create::promiseFor( - $this->asyncClient->sendAsyncRequest($request), - ) - ->then( - function (ResponseInterface $response) use ($id, $processResponse) { + $deferred = new DeferredFuture(); + + $this->asyncClient->sendAsyncRequest($request)->then( + function (ResponseInterface $response) use ($deferred, $id, $processResponse): void { + try { $this->sqlLogger?->stopQuery($id); if ($response->getStatusCode() !== 200) { @@ -186,12 +138,25 @@ function (ResponseInterface $response) use ($id, $processResponse) { Message::rewindBody($response); if ($processResponse === null) { - return $response; + $deferred->complete($response); + + return; } - return $processResponse($response); - }, - fn () => $this->sqlLogger?->stopQuery($id), - ); + $deferred->complete($processResponse($response)); + } catch (Throwable $throwable) { + $deferred->error($throwable); + } + }, + function (mixed $reason) use ($deferred, $id): void { + $this->sqlLogger?->stopQuery($id); + + $deferred->error( + $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), + ); + }, + ); + + return $deferred->getFuture(); } } diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index bda77ff..acdfc5c 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,7 +4,6 @@ namespace SimPod\ClickHouseClient\Tests\Client; -use GuzzleHttp\Promise\Utils; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; @@ -15,6 +14,8 @@ use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; +use function Amp\Future\await; + #[CoversClass(RequestFactory::class)] #[CoversClass(PsrClickHouseAsyncClient::class)] #[CoversClass(ServerError::class)] @@ -35,7 +36,7 @@ public function testAsyncSelect(): void /** @var Json $format */ $format = new Json(); - $promises = [ + $futures = [ $client->select($sql, $format), $client->select($sql, $format), ]; @@ -46,7 +47,7 @@ public function testAsyncSelect(): void * \SimPod\ClickHouseClient\Output\Json * } $jsonOutputs */ - $jsonOutputs = Utils::all($promises)->wait(); + $jsonOutputs = await($futures); $expectedData = ClickHouseVersion::quotes64BitIntegersInJson() ? [['number' => '0'], ['number' => '1']] @@ -60,6 +61,6 @@ public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); - self::$asyncClient->select('table', new TabSeparated())->wait(); + self::$asyncClient->select('table', new TabSeparated())->await(); } } From c24f33a04c0a9badf11e4199b1fcaa6303caa8d1 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 10:49:24 +0300 Subject: [PATCH 03/12] refactor(async): use amp http client transport --- composer.json | 2 +- src/Client/PsrClickHouseAsyncClient.php | 110 +++++++++++------------- src/Exception/ServerError.php | 7 +- tests/WithClient.php | 13 ++- 4 files changed, 62 insertions(+), 70 deletions(-) diff --git a/composer.json b/composer.json index f925bc5..b932f04 100644 --- a/composer.json +++ b/composer.json @@ -30,8 +30,8 @@ "require": { "php": "^8.4", "amphp/amp": "^3.1", + "amphp/http-client": "^5.3", "guzzlehttp/psr7": "^2.6", - "php-http/client-common": "^2.0", "psr/http-client": "^1.0", "psr/http-factory": "^1.0", "psr/http-message": "^2.0", diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index d89141b..d818f2c 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,36 +4,36 @@ namespace SimPod\ClickHouseClient\Client; -use Amp\DeferredFuture; use Amp\Future; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Request as AmpRequest; use Exception; -use GuzzleHttp\Promise\Create; -use GuzzleHttp\Promise\PromiseInterface; -use GuzzleHttp\Psr7\Message; -use Http\Client\HttpAsyncClient; -use Psr\Http\Message\ResponseInterface; -use RuntimeException; +use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; +use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; use Throwable; +use function Amp\async; use function uniqid; class PsrClickHouseAsyncClient implements ClickHouseAsyncClient { private SqlFactory $sqlFactory; + /** @param array $defaultHeaders */ public function __construct( - private HttpAsyncClient $asyncClient, + private HttpClient $client, private RequestFactory $requestFactory, + private array $defaultHeaders = [], private SqlLogger|null $sqlLogger = null, private SettingsProvider $defaultSettings = new EmptySettingsProvider(), ) { @@ -75,15 +75,13 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (ResponseInterface $response) => $outputFormat::output( - $response->getBody()->__toString(), - ), + processResponse: static fn (string $body): Output => $outputFormat::output($body), ); } /** * @param array $params - * @param (callable(ResponseInterface):mixed)|null $processResponse + * @param (callable(string):mixed)|null $processResponse * * @throws Exception */ @@ -104,59 +102,53 @@ private function executeRequest( ), ); - $id = uniqid('', true); - $this->sqlLogger?->startQuery($id, $sql); - - $deferred = new DeferredFuture(); - - $this->asyncClient->sendAsyncRequest($request)->then( - function (ResponseInterface $response) use ($deferred, $id, $processResponse): void { - try { - $this->sqlLogger?->stopQuery($id); - - if ($response->getStatusCode() !== 200) { - throw ServerError::fromResponse($response); - } - - $body = $response->getBody(); - if (! $body->isSeekable()) { - throw new RuntimeException( - 'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.', - ); - } + return async(function () use ($processResponse, $request, $sql): mixed { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); + + try { + $response = $this->client->request($this->toAmpRequest($request)); + $body = $response->getBody()->buffer(); + + if ( + $response->getStatus() !== 200 + || ServerError::bodyContainsStreamedException( + $body, + $response->getHeader('X-ClickHouse-Exception-Tag') ?? '', + ) + ) { + throw ServerError::fromResponseContent($body, $response->getStatus()); + } - $bodyContent = $body->__toString(); - if ( - ServerError::bodyContainsStreamedException( - $bodyContent, - $response->getHeaderLine('X-ClickHouse-Exception-Tag'), - ) - ) { - throw ServerError::fromResponse($response); - } + if ($processResponse === null) { + return $body; + } - Message::rewindBody($response); + return $processResponse($body); + } catch (Throwable $throwable) { + $this->sqlLogger?->stopQuery($id); - if ($processResponse === null) { - $deferred->complete($response); + throw $throwable; + } + }); + } - return; - } + private function toAmpRequest(RequestInterface $request): AmpRequest + { + $ampRequest = new AmpRequest( + $request->getUri(), + $request->getMethod(), + $request->getBody()->__toString(), + ); - $deferred->complete($processResponse($response)); - } catch (Throwable $throwable) { - $deferred->error($throwable); - } - }, - function (mixed $reason) use ($deferred, $id): void { - $this->sqlLogger?->stopQuery($id); + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } - $deferred->error( - $reason instanceof Throwable ? $reason : new RuntimeException('ClickHouse promise rejected'), - ); - }, - ); + foreach ($request->getHeaders() as $name => $values) { + $ampRequest->setHeader($name, $values); + } - return $deferred->getFuture(); + return $ampRequest; } } diff --git a/src/Exception/ServerError.php b/src/Exception/ServerError.php index c2ef5d1..ec6224b 100644 --- a/src/Exception/ServerError.php +++ b/src/Exception/ServerError.php @@ -23,8 +23,11 @@ private function __construct( public static function fromResponse(ResponseInterface $response): self { - $bodyContent = $response->getBody()->__toString(); + return self::fromResponseContent($response->getBody()->__toString(), $response->getStatusCode()); + } + public static function fromResponseContent(string $bodyContent, int $httpStatusCode): self + { $errorCode = preg_match('~(?:^|\R)Code: (\d+)\. DB::Exception:~', $bodyContent, $codeMatches) === 1 ? (int) $codeMatches[1] : 0; @@ -36,7 +39,7 @@ public static function fromResponse(ResponseInterface $response): self return new self( $bodyContent, $errorCode, - $response->getStatusCode(), + $httpStatusCode, $exceptionName, ); } diff --git a/tests/WithClient.php b/tests/WithClient.php index cea96f4..f996e9e 100644 --- a/tests/WithClient.php +++ b/tests/WithClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Tests; +use Amp\Http\Client\HttpClientBuilder; use InvalidArgumentException; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\After; @@ -18,12 +19,12 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use Symfony\Component\HttpClient\CurlHttpClient; -use Symfony\Component\HttpClient\HttplugClient; use Symfony\Component\HttpClient\Psr18Client; use function assert; use function getenv; use function is_string; +use function rawurlencode; use function sprintf; use function time; @@ -111,19 +112,15 @@ private static function restartClickHouseClient(): void ); static::$asyncClient = new PsrClickHouseAsyncClient( - new HttplugClient( - new CurlHttpClient([ - 'base_uri' => $endpoint, - 'headers' => $headers, - 'query' => ['database' => static::$currentDbName], - ]), - ), + HttpClientBuilder::buildDefault(), new RequestFactory( new ParamValueConverterRegistry(), new Psr17Factory(), new Psr17Factory(), new Psr17Factory(), + $endpoint . '?database=' . rawurlencode(static::$currentDbName), ), + $headers, ); static::$controllerClient->executeQuery(sprintf('DROP DATABASE IF EXISTS "%s"', static::$currentDbName)); From cf0c17a2edd7e9d9b08e0424321be91a3ba266c9 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 11:17:05 +0300 Subject: [PATCH 04/12] refactor(async): use amp psr7 adapter --- composer.json | 1 + src/Client/PsrClickHouseAsyncClient.php | 48 ++++++++++--------------- tests/WithClient.php | 2 ++ 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/composer.json b/composer.json index b932f04..90ec8aa 100644 --- a/composer.json +++ b/composer.json @@ -31,6 +31,7 @@ "php": "^8.4", "amphp/amp": "^3.1", "amphp/http-client": "^5.3", + "amphp/http-client-psr7": "^1.1", "guzzlehttp/psr7": "^2.6", "psr/http-client": "^1.0", "psr/http-factory": "^1.0", diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index d818f2c..665772a 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -6,9 +6,8 @@ use Amp\Future; use Amp\Http\Client\HttpClient; -use Amp\Http\Client\Request as AmpRequest; +use Amp\Http\Client\Psr7\PsrAdapter; use Exception; -use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -29,10 +28,11 @@ class PsrClickHouseAsyncClient implements ClickHouseAsyncClient { private SqlFactory $sqlFactory; - /** @param array $defaultHeaders */ + /** @param array $defaultHeaders */ public function __construct( private HttpClient $client, private RequestFactory $requestFactory, + private PsrAdapter $psrAdapter, private array $defaultHeaders = [], private SqlLogger|null $sqlLogger = null, private SettingsProvider $defaultSettings = new EmptySettingsProvider(), @@ -81,15 +81,19 @@ public function selectWithParams( /** * @param array $params - * @param (callable(string):mixed)|null $processResponse + * @param callable(string):T $processResponse + * + * @return Future * * @throws Exception + * + * @template T */ private function executeRequest( string $sql, array $params, SettingsProvider $settings, - callable|null $processResponse, + callable $processResponse, ): Future { $request = $this->requestFactory->prepareSqlRequest( $sql, @@ -102,12 +106,19 @@ private function executeRequest( ), ); - return async(function () use ($processResponse, $request, $sql): mixed { + /** @var Future $future */ + $future = async(function () use ($processResponse, $request, $sql): mixed { $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); try { - $response = $this->client->request($this->toAmpRequest($request)); + $ampRequest = $this->psrAdapter->fromPsrRequest($request); + + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + $response = $this->client->request($ampRequest); $body = $response->getBody()->buffer(); if ( @@ -120,10 +131,6 @@ private function executeRequest( throw ServerError::fromResponseContent($body, $response->getStatus()); } - if ($processResponse === null) { - return $body; - } - return $processResponse($body); } catch (Throwable $throwable) { $this->sqlLogger?->stopQuery($id); @@ -131,24 +138,7 @@ private function executeRequest( throw $throwable; } }); - } - - private function toAmpRequest(RequestInterface $request): AmpRequest - { - $ampRequest = new AmpRequest( - $request->getUri(), - $request->getMethod(), - $request->getBody()->__toString(), - ); - - foreach ($this->defaultHeaders as $name => $values) { - $ampRequest->setHeader($name, $values); - } - - foreach ($request->getHeaders() as $name => $values) { - $ampRequest->setHeader($name, $values); - } - return $ampRequest; + return $future; } } diff --git a/tests/WithClient.php b/tests/WithClient.php index f996e9e..d3095ef 100644 --- a/tests/WithClient.php +++ b/tests/WithClient.php @@ -5,6 +5,7 @@ namespace SimPod\ClickHouseClient\Tests; use Amp\Http\Client\HttpClientBuilder; +use Amp\Http\Client\Psr7\PsrAdapter; use InvalidArgumentException; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\After; @@ -120,6 +121,7 @@ private static function restartClickHouseClient(): void new Psr17Factory(), $endpoint . '?database=' . rawurlencode(static::$currentDbName), ), + new PsrAdapter(new Psr17Factory(), new Psr17Factory()), $headers, ); From b32a4fa4ec5ab8b0794e871de41a9cec409497f6 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 11:42:10 +0300 Subject: [PATCH 05/12] style(async): remove arrow function return type --- src/Client/PsrClickHouseAsyncClient.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 665772a..cdaa996 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -14,7 +14,6 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; -use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; @@ -75,7 +74,7 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (string $body): Output => $outputFormat::output($body), + processResponse: static fn (string $body) => $outputFormat::output($body), ); } From e175e0b595bddffdd9049b79bd4a40cb13bbbad6 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 12:21:26 +0300 Subject: [PATCH 06/12] feat(async): add select stream methods --- src/Client/ClickHouseAsyncClient.php | 29 ++++++++ src/Client/PsrClickHouseAsyncClient.php | 92 +++++++++++++++++++++++++ tests/Client/SelectAsyncTest.php | 18 +++++ 3 files changed, 139 insertions(+) diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index d61c00c..71081f2 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\ByteStream\Payload; use Amp\Future; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; @@ -39,4 +40,32 @@ public function selectWithParams( Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), ): Future; + + /** + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; + + /** + * @param array $params + * @param Format $outputFormat + * + * @return Future + * + * @template O of Output + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; } diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index cdaa996..4275057 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\ByteStream\Payload; use Amp\Future; use Amp\Http\Client\HttpClient; use Amp\Http\Client\Psr7\PsrAdapter; @@ -78,6 +79,44 @@ public function selectWithParams( ); } + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return $this->selectStreamWithParams($query, [], $outputFormat, $settings); + } + + /** + * {@inheritDoc} + * + * @throws Exception + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + $formatClause = $outputFormat::toSql(); + + $sql = $this->sqlFactory->createWithParameters($query, $params); + + return $this->executeStreamRequest( + << $params * @param callable(string):T $processResponse @@ -140,4 +179,57 @@ private function executeRequest( return $future; } + + /** + * @param array $params + * + * @return Future + * + * @throws Exception + */ + private function executeStreamRequest(string $sql, array $params, SettingsProvider $settings): Future + { + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( + $this->defaultSettings, + $settings, + ), + new RequestOptions( + $params, + ), + ); + + /** @var Future $future */ + $future = async(function () use ($request, $sql): Payload { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); + + try { + $ampRequest = $this->psrAdapter->fromPsrRequest($request); + + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + $response = $this->client->request($ampRequest); + $this->sqlLogger?->stopQuery($id); + + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent( + $response->getBody()->buffer(), + $response->getStatus(), + ); + } + + return $response->getBody(); + } catch (Throwable $throwable) { + $this->sqlLogger?->stopQuery($id); + + throw $throwable; + } + }); + + return $future; + } } diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index acdfc5c..46f4fd7 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -63,4 +63,22 @@ public function testSelectFromNonExistentTableExpectServerError(): void self::$asyncClient->select('table', new TabSeparated())->await(); } + + public function testAsyncSelectStream(): void + { + $stream = self::$asyncClient->selectStream('SELECT 1 AS data', new TabSeparated())->await(); + + self::assertSame("1\n", $stream->buffer()); + } + + public function testAsyncSelectStreamWithParams(): void + { + $stream = self::$asyncClient->selectStreamWithParams( + 'SELECT {p1:UInt8} AS data', + ['p1' => 3], + new TabSeparated(), + )->await(); + + self::assertSame("3\n", $stream->buffer()); + } } From 153d04feccc461fc6690a5c8ec80b9e63d43465d Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 12:34:53 +0300 Subject: [PATCH 07/12] test(async): cover default request headers --- src/Client/PsrClickHouseAsyncClient.php | 31 ++++++++-------- tests/Client/SelectAsyncTest.php | 48 +++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 14 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 4275057..a81298c 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -8,7 +8,10 @@ use Amp\Future; use Amp\Http\Client\HttpClient; use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request as AmpRequest; +use Error; use Exception; +use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -150,13 +153,7 @@ private function executeRequest( $this->sqlLogger?->startQuery($id, $sql); try { - $ampRequest = $this->psrAdapter->fromPsrRequest($request); - - foreach ($this->defaultHeaders as $name => $values) { - $ampRequest->setHeader($name, $values); - } - - $response = $this->client->request($ampRequest); + $response = $this->client->request($this->toAmpRequest($request)); $body = $response->getBody()->buffer(); if ( @@ -206,13 +203,7 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid $this->sqlLogger?->startQuery($id, $sql); try { - $ampRequest = $this->psrAdapter->fromPsrRequest($request); - - foreach ($this->defaultHeaders as $name => $values) { - $ampRequest->setHeader($name, $values); - } - - $response = $this->client->request($ampRequest); + $response = $this->client->request($this->toAmpRequest($request)); $this->sqlLogger?->stopQuery($id); if ($response->getStatus() !== 200) { @@ -232,4 +223,16 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid return $future; } + + /** @throws Error */ + private function toAmpRequest(RequestInterface $request): AmpRequest + { + $ampRequest = $this->psrAdapter->fromPsrRequest($request); + + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + return $ampRequest; + } } diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index 46f4fd7..1a33e19 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,12 +4,21 @@ namespace SimPod\ClickHouseClient\Tests\Client; +use Amp\Cancellation; +use Amp\Http\Client\DelegateHttpClient; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request; +use Amp\Http\Client\Response; +use Amp\Http\InvalidHeaderException; +use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Json; use SimPod\ClickHouseClient\Format\TabSeparated; +use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\ClickHouseVersion; use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; @@ -57,6 +66,45 @@ public function testAsyncSelect(): void self::assertSame($expectedData, $jsonOutputs[1]->data); } + public function testDefaultHeadersAreSent(): void + { + $delegate = new class implements DelegateHttpClient { + public Request|null $request = null; + + /** @throws InvalidHeaderException */ + public function request(Request $request, Cancellation $cancellation): Response + { + $this->request = $request; + + return new Response('1.1', 200, null, [], "1\n", $request); + } + }; + + $psr17Factory = new Psr17Factory(); + $client = new PsrClickHouseAsyncClient( + new HttpClient($delegate, []), + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + 'https://clickhouse.example', + ), + new PsrAdapter($psr17Factory, $psr17Factory), + [ + 'X-ClickHouse-Key' => 'secret', + 'X-ClickHouse-User' => 'user', + ], + ); + + $client->select('SELECT 1', new TabSeparated())->await(); + + $request = $delegate->request; + self::assertInstanceOf(Request::class, $request); + self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); + self::assertSame('user', $request->getHeader('X-ClickHouse-User')); + } + public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); From a59925aaf8dff60ac09138dc3b7b2dad7decac28 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 12:56:36 +0300 Subject: [PATCH 08/12] test(async): isolate default header coverage --- tests/Client/PsrClickHouseAsyncClientTest.php | 65 +++++++++++++++++++ tests/Client/SelectAsyncTest.php | 48 -------------- 2 files changed, 65 insertions(+), 48 deletions(-) create mode 100644 tests/Client/PsrClickHouseAsyncClientTest.php diff --git a/tests/Client/PsrClickHouseAsyncClientTest.php b/tests/Client/PsrClickHouseAsyncClientTest.php new file mode 100644 index 0000000..b3816d3 --- /dev/null +++ b/tests/Client/PsrClickHouseAsyncClientTest.php @@ -0,0 +1,65 @@ +request = $request; + + return new Response('1.1', 200, null, [], "1\n", $request); + } + }; + + $psr17Factory = new Psr17Factory(); + $client = new PsrClickHouseAsyncClient( + new HttpClient($delegate, []), + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + 'https://clickhouse.example', + ), + new PsrAdapter($psr17Factory, $psr17Factory), + [ + 'X-ClickHouse-Key' => 'secret', + 'X-ClickHouse-User' => 'user', + ], + ); + + $client->select('SELECT 1', new TabSeparated())->await(); + + $request = $delegate->request; + self::assertInstanceOf(Request::class, $request); + self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); + self::assertSame('user', $request->getHeader('X-ClickHouse-User')); + } +} diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index 1a33e19..46f4fd7 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,21 +4,12 @@ namespace SimPod\ClickHouseClient\Tests\Client; -use Amp\Cancellation; -use Amp\Http\Client\DelegateHttpClient; -use Amp\Http\Client\HttpClient; -use Amp\Http\Client\Psr7\PsrAdapter; -use Amp\Http\Client\Request; -use Amp\Http\Client\Response; -use Amp\Http\InvalidHeaderException; -use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Json; use SimPod\ClickHouseClient\Format\TabSeparated; -use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\ClickHouseVersion; use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; @@ -66,45 +57,6 @@ public function testAsyncSelect(): void self::assertSame($expectedData, $jsonOutputs[1]->data); } - public function testDefaultHeadersAreSent(): void - { - $delegate = new class implements DelegateHttpClient { - public Request|null $request = null; - - /** @throws InvalidHeaderException */ - public function request(Request $request, Cancellation $cancellation): Response - { - $this->request = $request; - - return new Response('1.1', 200, null, [], "1\n", $request); - } - }; - - $psr17Factory = new Psr17Factory(); - $client = new PsrClickHouseAsyncClient( - new HttpClient($delegate, []), - new RequestFactory( - new ParamValueConverterRegistry(), - $psr17Factory, - $psr17Factory, - $psr17Factory, - 'https://clickhouse.example', - ), - new PsrAdapter($psr17Factory, $psr17Factory), - [ - 'X-ClickHouse-Key' => 'secret', - 'X-ClickHouse-User' => 'user', - ], - ); - - $client->select('SELECT 1', new TabSeparated())->await(); - - $request = $delegate->request; - self::assertInstanceOf(Request::class, $request); - self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); - self::assertSame('user', $request->getHeader('X-ClickHouse-User')); - } - public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); From 734aeeb3fae682bdd3d5965848298dc7f772878b Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Fri, 26 Jun 2026 14:51:19 +0300 Subject: [PATCH 09/12] fix(async): stop query logging once --- src/Client/PsrClickHouseAsyncClient.php | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index a81298c..0b1c3d6 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -22,7 +22,6 @@ use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; -use Throwable; use function Amp\async; use function uniqid; @@ -46,6 +45,7 @@ public function __construct( /** * {@inheritDoc} * + * @throws Error * @throws Exception */ public function select( @@ -167,10 +167,8 @@ private function executeRequest( } return $processResponse($body); - } catch (Throwable $throwable) { + } finally { $this->sqlLogger?->stopQuery($id); - - throw $throwable; } }); @@ -182,6 +180,7 @@ private function executeRequest( * * @return Future * + * @throws Error * @throws Exception */ private function executeStreamRequest(string $sql, array $params, SettingsProvider $settings): Future @@ -204,7 +203,6 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid try { $response = $this->client->request($this->toAmpRequest($request)); - $this->sqlLogger?->stopQuery($id); if ($response->getStatus() !== 200) { throw ServerError::fromResponseContent( @@ -214,10 +212,8 @@ private function executeStreamRequest(string $sql, array $params, SettingsProvid } return $response->getBody(); - } catch (Throwable $throwable) { + } finally { $this->sqlLogger?->stopQuery($id); - - throw $throwable; } }); From 5778d80058508569bf96bfb37e86c706d1ed6874 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Wed, 1 Jul 2026 11:07:18 +0300 Subject: [PATCH 10/12] test(async): update streamed exception tests --- src/Client/PsrClickHouseAsyncClient.php | 4 ++ ...rClickHouseClientStreamedExceptionTest.php | 57 +++++++++++-------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 0b1c3d6..7dfd23c 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -59,6 +59,7 @@ public function select( /** * {@inheritDoc} * + * @throws Error * @throws Exception */ public function selectWithParams( @@ -85,6 +86,7 @@ public function selectWithParams( /** * {@inheritDoc} * + * @throws Error * @throws Exception */ public function selectStream( @@ -98,6 +100,7 @@ public function selectStream( /** * {@inheritDoc} * + * @throws Error * @throws Exception */ public function selectStreamWithParams( @@ -126,6 +129,7 @@ public function selectStreamWithParams( * * @return Future * + * @throws Error * @throws Exception * * @template T diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php index 456627a..86c6d05 100644 --- a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -4,9 +4,14 @@ namespace SimPod\ClickHouseClient\Tests\Client; +use Amp\Cancellation; +use Amp\Http\Client\DelegateHttpClient; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request; +use Amp\Http\Client\Response; +use Amp\Http\InvalidHeaderException; use GuzzleHttp\Psr7\NoSeekStream; -use Http\Client\HttpAsyncClient; -use Http\Promise\FulfilledPromise; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use Psr\Http\Client\ClientInterface; @@ -187,18 +192,23 @@ public function sendRequest(RequestInterface $request): ResponseInterface public function testAsyncSelectThrowsServerErrorWhenOkResponseContainsStreamedException(): void { $psr17Factory = new Psr17Factory(); - $response = $psr17Factory->createResponse(200) - ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); - $httpClient = new class ($response) implements HttpAsyncClient { - public function __construct(private ResponseInterface $response) + $delegate = new class (self::streamedExceptionBody()) implements DelegateHttpClient { + public function __construct(private string $body) { } - public function sendAsyncRequest(RequestInterface $request): FulfilledPromise + /** @throws InvalidHeaderException */ + public function request(Request $request, Cancellation $cancellation): Response { - return new FulfilledPromise($this->response); + return new Response( + '1.1', + 200, + null, + ['X-ClickHouse-Exception-Tag' => 'abcdefghijklmnop'], + $this->body, + $request, + ); } }; @@ -219,18 +229,20 @@ public function stopQuery(string $id): void }; $client = new PsrClickHouseAsyncClient( - $httpClient, + new HttpClient($delegate, []), new RequestFactory( new ParamValueConverterRegistry(), $psr17Factory, $psr17Factory, $psr17Factory, ), + new PsrAdapter($psr17Factory, $psr17Factory), + [], $logger, ); try { - $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated())->wait(); + $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated())->await(); self::fail('ServerError was not thrown.'); } catch (ServerError $serverError) { self::assertSame(395, $serverError->getCode()); @@ -242,37 +254,32 @@ public function stopQuery(string $id): void self::assertSame(1, $logger->stopCount); } - public function testAsyncSelectThrowsWhenStreamedExceptionInspectionWouldConsumeNonSeekableBody(): void + public function testAsyncSelectReturnsSuccessfulOkResponseAfterStreamedExceptionInspection(): void { $psr17Factory = new Psr17Factory(); - $response = $psr17Factory->createResponse(200) - ->withBody(new NoSeekStream($psr17Factory->createStream("1\n"))); - $httpClient = new class ($response) implements HttpAsyncClient { - public function __construct(private ResponseInterface $response) + $delegate = new class implements DelegateHttpClient { + /** @throws InvalidHeaderException */ + public function request(Request $request, Cancellation $cancellation): Response { - } - - public function sendAsyncRequest(RequestInterface $request): FulfilledPromise - { - return new FulfilledPromise($this->response); + return new Response('1.1', 200, null, [], "1\n", $request); } }; $client = new PsrClickHouseAsyncClient( - $httpClient, + new HttpClient($delegate, []), new RequestFactory( new ParamValueConverterRegistry(), $psr17Factory, $psr17Factory, $psr17Factory, ), + new PsrAdapter($psr17Factory, $psr17Factory), ); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage('Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.'); + $output = $client->select('SELECT 1', new TabSeparated())->await(); - $client->select('SELECT 1', new TabSeparated())->wait(); + self::assertSame("1\n", $output->contents); } private static function streamedExceptionBody(): string From 5724f919f9a193b0ff66e39e6e90f96021d2ce80 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Wed, 1 Jul 2026 11:31:43 +0300 Subject: [PATCH 11/12] docs(async): fix stream format generics --- src/Client/ClickHouseAsyncClient.php | 10 +++------- src/Client/PsrClickHouseAsyncClient.php | 6 ++++++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index 71081f2..2a38351 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -42,11 +42,9 @@ public function selectWithParams( ): Future; /** - * @param Format $outputFormat + * @param Format> $outputFormat * * @return Future - * - * @template O of Output */ public function selectStream( string $query, @@ -55,12 +53,10 @@ public function selectStream( ): Future; /** - * @param array $params - * @param Format $outputFormat + * @param array $params + * @param Format> $outputFormat * * @return Future - * - * @template O of Output */ public function selectStreamWithParams( string $query, diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 7dfd23c..56e55dc 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -18,6 +18,7 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; +use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; @@ -86,6 +87,8 @@ public function selectWithParams( /** * {@inheritDoc} * + * @param Format> $outputFormat + * * @throws Error * @throws Exception */ @@ -100,6 +103,9 @@ public function selectStream( /** * {@inheritDoc} * + * @param array $params + * @param Format> $outputFormat + * * @throws Error * @throws Exception */ From 2a0c0836c497eb18d69e31e704004b1e17184f77 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Wed, 1 Jul 2026 12:02:39 +0300 Subject: [PATCH 12/12] fix(async): disable default request limits --- src/Client/PsrClickHouseAsyncClient.php | 5 +++++ tests/Client/PsrClickHouseAsyncClientTest.php | 3 +++ 2 files changed, 8 insertions(+) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 56e55dc..4660ca2 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -239,6 +239,11 @@ private function toAmpRequest(RequestInterface $request): AmpRequest $ampRequest->setHeader($name, $values); } + // ClickHouse queries may be long-running and can stream large result sets. + $ampRequest->setTransferTimeout(0); + $ampRequest->setInactivityTimeout(0); + $ampRequest->setBodySizeLimit(0); + return $ampRequest; } } diff --git a/tests/Client/PsrClickHouseAsyncClientTest.php b/tests/Client/PsrClickHouseAsyncClientTest.php index b3816d3..b4b3861 100644 --- a/tests/Client/PsrClickHouseAsyncClientTest.php +++ b/tests/Client/PsrClickHouseAsyncClientTest.php @@ -61,5 +61,8 @@ public function request(Request $request, Cancellation $cancellation): Response self::assertInstanceOf(Request::class, $request); self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); self::assertSame('user', $request->getHeader('X-ClickHouse-User')); + self::assertSame(0.0, $request->getTransferTimeout()); + self::assertSame(0.0, $request->getInactivityTimeout()); + self::assertSame(0, $request->getBodySizeLimit()); } }