diff --git a/composer.json b/composer.json index e8cd916..90ec8aa 100644 --- a/composer.json +++ b/composer.json @@ -29,9 +29,10 @@ }, "require": { "php": "^8.4", - "guzzlehttp/promises": "^2.0", + "amphp/amp": "^3.1", + "amphp/http-client": "^5.3", + "amphp/http-client-psr7": "^1.1", "guzzlehttp/psr7": "^2.6", - "php-http/client-common": "^2.0", "psr/http-client": "^1.0", "psr/http-factory": "^1.0", "psr/http-message": "^2.0", diff --git a/src/Client/ClickHouseAsyncClient.php b/src/Client/ClickHouseAsyncClient.php index b40590b..2a38351 100644 --- a/src/Client/ClickHouseAsyncClient.php +++ b/src/Client/ClickHouseAsyncClient.php @@ -4,7 +4,8 @@ namespace SimPod\ClickHouseClient\Client; -use GuzzleHttp\Promise\PromiseInterface; +use Amp\ByteStream\Payload; +use Amp\Future; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; @@ -15,18 +16,22 @@ interface ClickHouseAsyncClient /** * @param Format $outputFormat * + * @return Future + * * @template O of Output */ public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; + ): Future; /** * @param array $params * @param Format $outputFormat * + * @return Future + * * @template O of Output */ public function selectWithParams( @@ -34,5 +39,29 @@ public function selectWithParams( array $params, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface; + ): Future; + + /** + * @param Format> $outputFormat + * + * @return Future + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; + + /** + * @param array $params + * @param Format> $outputFormat + * + * @return Future + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future; } diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index de6b51c..4660ca2 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -4,33 +4,39 @@ namespace SimPod\ClickHouseClient\Client; +use Amp\ByteStream\Payload; +use Amp\Future; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request as AmpRequest; +use Error; use Exception; -use GuzzleHttp\Promise\Create; -use GuzzleHttp\Promise\PromiseInterface; -use GuzzleHttp\Psr7\Message; -use Http\Client\HttpAsyncClient; -use Psr\Http\Message\ResponseInterface; -use RuntimeException; +use Psr\Http\Message\RequestInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; +use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; +use function Amp\async; use function uniqid; class PsrClickHouseAsyncClient implements ClickHouseAsyncClient { private SqlFactory $sqlFactory; + /** @param array $defaultHeaders */ public function __construct( - private HttpAsyncClient $asyncClient, + private HttpClient $client, private RequestFactory $requestFactory, + private PsrAdapter $psrAdapter, + private array $defaultHeaders = [], private SqlLogger|null $sqlLogger = null, private SettingsProvider $defaultSettings = new EmptySettingsProvider(), ) { @@ -40,19 +46,21 @@ public function __construct( /** * {@inheritDoc} * + * @throws Error * @throws Exception */ public function select( string $query, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { + ): Future { return $this->selectWithParams($query, [], $outputFormat, $settings); } /** * {@inheritDoc} * + * @throws Error * @throws Exception */ public function selectWithParams( @@ -60,7 +68,7 @@ public function selectWithParams( array $params, Format $outputFormat, SettingsProvider $settings = new EmptySettingsProvider(), - ): PromiseInterface { + ): Future { $formatClause = $outputFormat::toSql(); $sql = $this->sqlFactory->createWithParameters($query, $params); @@ -72,24 +80,121 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (ResponseInterface $response) => $outputFormat::output( - $response->getBody()->__toString(), - ), + processResponse: static fn (string $body) => $outputFormat::output($body), + ); + } + + /** + * {@inheritDoc} + * + * @param Format> $outputFormat + * + * @throws Error + * @throws Exception + */ + public function selectStream( + string $query, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + return $this->selectStreamWithParams($query, [], $outputFormat, $settings); + } + + /** + * {@inheritDoc} + * + * @param array $params + * @param Format> $outputFormat + * + * @throws Error + * @throws Exception + */ + public function selectStreamWithParams( + string $query, + array $params, + Format $outputFormat, + SettingsProvider $settings = new EmptySettingsProvider(), + ): Future { + $formatClause = $outputFormat::toSql(); + + $sql = $this->sqlFactory->createWithParameters($query, $params); + + return $this->executeStreamRequest( + << $params - * @param (callable(ResponseInterface):mixed)|null $processResponse + * @param callable(string):T $processResponse * + * @return Future + * + * @throws Error * @throws Exception + * + * @template T */ private function executeRequest( string $sql, array $params, SettingsProvider $settings, - callable|null $processResponse, - ): PromiseInterface { + callable $processResponse, + ): Future { + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( + $this->defaultSettings, + $settings, + ), + new RequestOptions( + $params, + ), + ); + + /** @var Future $future */ + $future = async(function () use ($processResponse, $request, $sql): mixed { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); + + try { + $response = $this->client->request($this->toAmpRequest($request)); + $body = $response->getBody()->buffer(); + + if ( + $response->getStatus() !== 200 + || ServerError::bodyContainsStreamedException( + $body, + $response->getHeader('X-ClickHouse-Exception-Tag') ?? '', + ) + ) { + throw ServerError::fromResponseContent($body, $response->getStatus()); + } + + return $processResponse($body); + } finally { + $this->sqlLogger?->stopQuery($id); + } + }); + + return $future; + } + + /** + * @param array $params + * + * @return Future + * + * @throws Error + * @throws Exception + */ + private function executeStreamRequest(string $sql, array $params, SettingsProvider $settings): Future + { $request = $this->requestFactory->prepareSqlRequest( $sql, new RequestSettings( @@ -101,46 +206,44 @@ private function executeRequest( ), ); - $id = uniqid('', true); - $this->sqlLogger?->startQuery($id, $sql); - - return Create::promiseFor( - $this->asyncClient->sendAsyncRequest($request), - ) - ->then( - function (ResponseInterface $response) use ($id, $processResponse) { - $this->sqlLogger?->stopQuery($id); - - if ($response->getStatusCode() !== 200) { - throw ServerError::fromResponse($response); - } - - $body = $response->getBody(); - if (! $body->isSeekable()) { - throw new RuntimeException( - 'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.', - ); - } - - $bodyContent = $body->__toString(); - if ( - ServerError::bodyContainsStreamedException( - $bodyContent, - $response->getHeaderLine('X-ClickHouse-Exception-Tag'), - ) - ) { - throw ServerError::fromResponse($response); - } - - Message::rewindBody($response); - - if ($processResponse === null) { - return $response; - } - - return $processResponse($response); - }, - fn () => $this->sqlLogger?->stopQuery($id), - ); + /** @var Future $future */ + $future = async(function () use ($request, $sql): Payload { + $id = uniqid('', true); + $this->sqlLogger?->startQuery($id, $sql); + + try { + $response = $this->client->request($this->toAmpRequest($request)); + + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent( + $response->getBody()->buffer(), + $response->getStatus(), + ); + } + + return $response->getBody(); + } finally { + $this->sqlLogger?->stopQuery($id); + } + }); + + return $future; + } + + /** @throws Error */ + private function toAmpRequest(RequestInterface $request): AmpRequest + { + $ampRequest = $this->psrAdapter->fromPsrRequest($request); + + foreach ($this->defaultHeaders as $name => $values) { + $ampRequest->setHeader($name, $values); + } + + // ClickHouse queries may be long-running and can stream large result sets. + $ampRequest->setTransferTimeout(0); + $ampRequest->setInactivityTimeout(0); + $ampRequest->setBodySizeLimit(0); + + return $ampRequest; } } diff --git a/src/Exception/ServerError.php b/src/Exception/ServerError.php index c2ef5d1..ec6224b 100644 --- a/src/Exception/ServerError.php +++ b/src/Exception/ServerError.php @@ -23,8 +23,11 @@ private function __construct( public static function fromResponse(ResponseInterface $response): self { - $bodyContent = $response->getBody()->__toString(); + return self::fromResponseContent($response->getBody()->__toString(), $response->getStatusCode()); + } + public static function fromResponseContent(string $bodyContent, int $httpStatusCode): self + { $errorCode = preg_match('~(?:^|\R)Code: (\d+)\. DB::Exception:~', $bodyContent, $codeMatches) === 1 ? (int) $codeMatches[1] : 0; @@ -36,7 +39,7 @@ public static function fromResponse(ResponseInterface $response): self return new self( $bodyContent, $errorCode, - $response->getStatusCode(), + $httpStatusCode, $exceptionName, ); } diff --git a/tests/Client/PsrClickHouseAsyncClientTest.php b/tests/Client/PsrClickHouseAsyncClientTest.php new file mode 100644 index 0000000..b4b3861 --- /dev/null +++ b/tests/Client/PsrClickHouseAsyncClientTest.php @@ -0,0 +1,68 @@ +request = $request; + + return new Response('1.1', 200, null, [], "1\n", $request); + } + }; + + $psr17Factory = new Psr17Factory(); + $client = new PsrClickHouseAsyncClient( + new HttpClient($delegate, []), + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + 'https://clickhouse.example', + ), + new PsrAdapter($psr17Factory, $psr17Factory), + [ + 'X-ClickHouse-Key' => 'secret', + 'X-ClickHouse-User' => 'user', + ], + ); + + $client->select('SELECT 1', new TabSeparated())->await(); + + $request = $delegate->request; + self::assertInstanceOf(Request::class, $request); + self::assertSame('secret', $request->getHeader('X-ClickHouse-Key')); + self::assertSame('user', $request->getHeader('X-ClickHouse-User')); + self::assertSame(0.0, $request->getTransferTimeout()); + self::assertSame(0.0, $request->getInactivityTimeout()); + self::assertSame(0, $request->getBodySizeLimit()); + } +} diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php index 456627a..86c6d05 100644 --- a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -4,9 +4,14 @@ namespace SimPod\ClickHouseClient\Tests\Client; +use Amp\Cancellation; +use Amp\Http\Client\DelegateHttpClient; +use Amp\Http\Client\HttpClient; +use Amp\Http\Client\Psr7\PsrAdapter; +use Amp\Http\Client\Request; +use Amp\Http\Client\Response; +use Amp\Http\InvalidHeaderException; use GuzzleHttp\Psr7\NoSeekStream; -use Http\Client\HttpAsyncClient; -use Http\Promise\FulfilledPromise; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use Psr\Http\Client\ClientInterface; @@ -187,18 +192,23 @@ public function sendRequest(RequestInterface $request): ResponseInterface public function testAsyncSelectThrowsServerErrorWhenOkResponseContainsStreamedException(): void { $psr17Factory = new Psr17Factory(); - $response = $psr17Factory->createResponse(200) - ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); - $httpClient = new class ($response) implements HttpAsyncClient { - public function __construct(private ResponseInterface $response) + $delegate = new class (self::streamedExceptionBody()) implements DelegateHttpClient { + public function __construct(private string $body) { } - public function sendAsyncRequest(RequestInterface $request): FulfilledPromise + /** @throws InvalidHeaderException */ + public function request(Request $request, Cancellation $cancellation): Response { - return new FulfilledPromise($this->response); + return new Response( + '1.1', + 200, + null, + ['X-ClickHouse-Exception-Tag' => 'abcdefghijklmnop'], + $this->body, + $request, + ); } }; @@ -219,18 +229,20 @@ public function stopQuery(string $id): void }; $client = new PsrClickHouseAsyncClient( - $httpClient, + new HttpClient($delegate, []), new RequestFactory( new ParamValueConverterRegistry(), $psr17Factory, $psr17Factory, $psr17Factory, ), + new PsrAdapter($psr17Factory, $psr17Factory), + [], $logger, ); try { - $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated())->wait(); + $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated())->await(); self::fail('ServerError was not thrown.'); } catch (ServerError $serverError) { self::assertSame(395, $serverError->getCode()); @@ -242,37 +254,32 @@ public function stopQuery(string $id): void self::assertSame(1, $logger->stopCount); } - public function testAsyncSelectThrowsWhenStreamedExceptionInspectionWouldConsumeNonSeekableBody(): void + public function testAsyncSelectReturnsSuccessfulOkResponseAfterStreamedExceptionInspection(): void { $psr17Factory = new Psr17Factory(); - $response = $psr17Factory->createResponse(200) - ->withBody(new NoSeekStream($psr17Factory->createStream("1\n"))); - $httpClient = new class ($response) implements HttpAsyncClient { - public function __construct(private ResponseInterface $response) + $delegate = new class implements DelegateHttpClient { + /** @throws InvalidHeaderException */ + public function request(Request $request, Cancellation $cancellation): Response { - } - - public function sendAsyncRequest(RequestInterface $request): FulfilledPromise - { - return new FulfilledPromise($this->response); + return new Response('1.1', 200, null, [], "1\n", $request); } }; $client = new PsrClickHouseAsyncClient( - $httpClient, + new HttpClient($delegate, []), new RequestFactory( new ParamValueConverterRegistry(), $psr17Factory, $psr17Factory, $psr17Factory, ), + new PsrAdapter($psr17Factory, $psr17Factory), ); - $this->expectException(RuntimeException::class); - $this->expectExceptionMessage('Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.'); + $output = $client->select('SELECT 1', new TabSeparated())->await(); - $client->select('SELECT 1', new TabSeparated())->wait(); + self::assertSame("1\n", $output->contents); } private static function streamedExceptionBody(): string diff --git a/tests/Client/SelectAsyncTest.php b/tests/Client/SelectAsyncTest.php index bda77ff..46f4fd7 100644 --- a/tests/Client/SelectAsyncTest.php +++ b/tests/Client/SelectAsyncTest.php @@ -4,7 +4,6 @@ namespace SimPod\ClickHouseClient\Tests\Client; -use GuzzleHttp\Promise\Utils; use PHPUnit\Framework\Attributes\CoversClass; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; @@ -15,6 +14,8 @@ use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; +use function Amp\Future\await; + #[CoversClass(RequestFactory::class)] #[CoversClass(PsrClickHouseAsyncClient::class)] #[CoversClass(ServerError::class)] @@ -35,7 +36,7 @@ public function testAsyncSelect(): void /** @var Json $format */ $format = new Json(); - $promises = [ + $futures = [ $client->select($sql, $format), $client->select($sql, $format), ]; @@ -46,7 +47,7 @@ public function testAsyncSelect(): void * \SimPod\ClickHouseClient\Output\Json * } $jsonOutputs */ - $jsonOutputs = Utils::all($promises)->wait(); + $jsonOutputs = await($futures); $expectedData = ClickHouseVersion::quotes64BitIntegersInJson() ? [['number' => '0'], ['number' => '1']] @@ -60,6 +61,24 @@ public function testSelectFromNonExistentTableExpectServerError(): void { $this->expectException(ServerError::class); - self::$asyncClient->select('table', new TabSeparated())->wait(); + self::$asyncClient->select('table', new TabSeparated())->await(); + } + + public function testAsyncSelectStream(): void + { + $stream = self::$asyncClient->selectStream('SELECT 1 AS data', new TabSeparated())->await(); + + self::assertSame("1\n", $stream->buffer()); + } + + public function testAsyncSelectStreamWithParams(): void + { + $stream = self::$asyncClient->selectStreamWithParams( + 'SELECT {p1:UInt8} AS data', + ['p1' => 3], + new TabSeparated(), + )->await(); + + self::assertSame("3\n", $stream->buffer()); } } diff --git a/tests/WithClient.php b/tests/WithClient.php index cea96f4..d3095ef 100644 --- a/tests/WithClient.php +++ b/tests/WithClient.php @@ -4,6 +4,8 @@ namespace SimPod\ClickHouseClient\Tests; +use Amp\Http\Client\HttpClientBuilder; +use Amp\Http\Client\Psr7\PsrAdapter; use InvalidArgumentException; use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\After; @@ -18,12 +20,12 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use Symfony\Component\HttpClient\CurlHttpClient; -use Symfony\Component\HttpClient\HttplugClient; use Symfony\Component\HttpClient\Psr18Client; use function assert; use function getenv; use function is_string; +use function rawurlencode; use function sprintf; use function time; @@ -111,19 +113,16 @@ private static function restartClickHouseClient(): void ); static::$asyncClient = new PsrClickHouseAsyncClient( - new HttplugClient( - new CurlHttpClient([ - 'base_uri' => $endpoint, - 'headers' => $headers, - 'query' => ['database' => static::$currentDbName], - ]), - ), + HttpClientBuilder::buildDefault(), new RequestFactory( new ParamValueConverterRegistry(), new Psr17Factory(), new Psr17Factory(), new Psr17Factory(), + $endpoint . '?database=' . rawurlencode(static::$currentDbName), ), + new PsrAdapter(new Psr17Factory(), new Psr17Factory()), + $headers, ); static::$controllerClient->executeQuery(sprintf('DROP DATABASE IF EXISTS "%s"', static::$currentDbName));