Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
},
"require": {
"php": "^8.4",
"guzzlehttp/promises": "^2.0",
"amphp/amp": "^3.1",
"amphp/http-client": "^5.3",
"amphp/http-client-psr7": "^1.1",
"guzzlehttp/psr7": "^2.6",
"php-http/client-common": "^2.0",
"psr/http-client": "^1.0",
"psr/http-factory": "^1.0",
"psr/http-message": "^2.0",
Expand Down
35 changes: 32 additions & 3 deletions src/Client/ClickHouseAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

namespace SimPod\ClickHouseClient\Client;

use GuzzleHttp\Promise\PromiseInterface;
use Amp\ByteStream\Payload;
use Amp\Future;
use SimPod\ClickHouseClient\Format\Format;
use SimPod\ClickHouseClient\Output\Output;
use SimPod\ClickHouseClient\Settings\EmptySettingsProvider;
Expand All @@ -15,24 +16,52 @@ interface ClickHouseAsyncClient
/**
* @param Format<O> $outputFormat
*
* @return Future<O>
*
* @template O of Output
*/
public function select(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface;
): Future;

/**
* @param array<string, mixed> $params
* @param Format<O> $outputFormat
*
* @return Future<O>
*
* @template O of Output
*/
public function selectWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface;
): Future;

/**
* @param Format<Output<mixed>> $outputFormat
*
* @return Future<Payload>
*/
public function selectStream(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future;

/**
* @param array<string, mixed> $params
* @param Format<Output<mixed>> $outputFormat
*
* @return Future<Payload>
*/
public function selectStreamWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future;
}
215 changes: 159 additions & 56 deletions src/Client/PsrClickHouseAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,39 @@

namespace SimPod\ClickHouseClient\Client;

use Amp\ByteStream\Payload;
use Amp\Future;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Psr7\PsrAdapter;
use Amp\Http\Client\Request as AmpRequest;
use Error;
use Exception;
use GuzzleHttp\Promise\Create;
use GuzzleHttp\Promise\PromiseInterface;
use GuzzleHttp\Psr7\Message;
use Http\Client\HttpAsyncClient;
use Psr\Http\Message\ResponseInterface;
use RuntimeException;
use Psr\Http\Message\RequestInterface;
use SimPod\ClickHouseClient\Client\Http\RequestFactory;
use SimPod\ClickHouseClient\Client\Http\RequestOptions;
use SimPod\ClickHouseClient\Client\Http\RequestSettings;
use SimPod\ClickHouseClient\Exception\ServerError;
use SimPod\ClickHouseClient\Format\Format;
use SimPod\ClickHouseClient\Logger\SqlLogger;
use SimPod\ClickHouseClient\Output\Output;
use SimPod\ClickHouseClient\Settings\EmptySettingsProvider;
use SimPod\ClickHouseClient\Settings\SettingsProvider;
use SimPod\ClickHouseClient\Sql\SqlFactory;
use SimPod\ClickHouseClient\Sql\ValueFormatter;

use function Amp\async;
use function uniqid;

class PsrClickHouseAsyncClient implements ClickHouseAsyncClient
{
private SqlFactory $sqlFactory;

/** @param array<non-empty-string, string|string[]> $defaultHeaders */
public function __construct(
private HttpAsyncClient $asyncClient,
private HttpClient $client,
private RequestFactory $requestFactory,
private PsrAdapter $psrAdapter,
private array $defaultHeaders = [],
private SqlLogger|null $sqlLogger = null,
private SettingsProvider $defaultSettings = new EmptySettingsProvider(),
) {
Expand All @@ -40,27 +46,29 @@
/**
* {@inheritDoc}
*
* @throws Error
* @throws Exception
*/
public function select(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface {
): Future {
return $this->selectWithParams($query, [], $outputFormat, $settings);
}

/**
* {@inheritDoc}
*
* @throws Error
* @throws Exception
*/
public function selectWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): PromiseInterface {
): Future {
$formatClause = $outputFormat::toSql();

$sql = $this->sqlFactory->createWithParameters($query, $params);
Expand All @@ -72,24 +80,121 @@
CLICKHOUSE,
params: $params,
settings: $settings,
processResponse: static fn (ResponseInterface $response) => $outputFormat::output(
$response->getBody()->__toString(),
),
processResponse: static fn (string $body) => $outputFormat::output($body),
);
}

/**
* {@inheritDoc}
*
* @param Format<Output<mixed>> $outputFormat
*
* @throws Error
* @throws Exception
*/
public function selectStream(
string $query,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future {
return $this->selectStreamWithParams($query, [], $outputFormat, $settings);
}

/**
* {@inheritDoc}
*
* @param array<string, mixed> $params
* @param Format<Output<mixed>> $outputFormat
*
* @throws Error
* @throws Exception
*/
public function selectStreamWithParams(
string $query,
array $params,
Format $outputFormat,
SettingsProvider $settings = new EmptySettingsProvider(),
): Future {
$formatClause = $outputFormat::toSql();

$sql = $this->sqlFactory->createWithParameters($query, $params);

return $this->executeStreamRequest(
<<<CLICKHOUSE
$sql
$formatClause
CLICKHOUSE,
params: $params,
settings: $settings,
);
}

/**
* @param array<string, mixed> $params
* @param (callable(ResponseInterface):mixed)|null $processResponse
* @param callable(string):T $processResponse
*
* @return Future<T>
*
* @throws Error
* @throws Exception
*
* @template T
*/
private function executeRequest(
string $sql,
array $params,
SettingsProvider $settings,
callable|null $processResponse,
): PromiseInterface {
callable $processResponse,
): Future {
$request = $this->requestFactory->prepareSqlRequest(
$sql,
new RequestSettings(
$this->defaultSettings,
$settings,
),
new RequestOptions(
$params,
),
);
Comment on lines 143 to +158

/** @var Future<T> $future */
$future = async(function () use ($processResponse, $request, $sql): mixed {
$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);

try {
$response = $this->client->request($this->toAmpRequest($request));
$body = $response->getBody()->buffer();

if (
$response->getStatus() !== 200
|| ServerError::bodyContainsStreamedException(
$body,
$response->getHeader('X-ClickHouse-Exception-Tag') ?? '',

Check warning on line 173 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "Coalesce": @@ @@ $response->getStatus() !== 200 || ServerError::bodyContainsStreamedException( $body, - $response->getHeader('X-ClickHouse-Exception-Tag') ?? '', + '' ?? $response->getHeader('X-ClickHouse-Exception-Tag'), ) ) { throw ServerError::fromResponseContent($body, $response->getStatus());
)
) {
throw ServerError::fromResponseContent($body, $response->getStatus());
}

return $processResponse($body);
} finally {
$this->sqlLogger?->stopQuery($id);
}
});

return $future;
}

/**
* @param array<string, mixed> $params
*
* @return Future<Payload>
*
* @throws Error
* @throws Exception
*/
private function executeStreamRequest(string $sql, array $params, SettingsProvider $settings): Future
{
$request = $this->requestFactory->prepareSqlRequest(
$sql,
new RequestSettings(
Expand All @@ -101,46 +206,44 @@
),
);

$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);

return Create::promiseFor(
$this->asyncClient->sendAsyncRequest($request),
)
->then(
function (ResponseInterface $response) use ($id, $processResponse) {
$this->sqlLogger?->stopQuery($id);

if ($response->getStatusCode() !== 200) {
throw ServerError::fromResponse($response);
}

$body = $response->getBody();
if (! $body->isSeekable()) {
throw new RuntimeException(
'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.',
);
}

$bodyContent = $body->__toString();
if (
ServerError::bodyContainsStreamedException(
$bodyContent,
$response->getHeaderLine('X-ClickHouse-Exception-Tag'),
)
) {
throw ServerError::fromResponse($response);
}

Message::rewindBody($response);

if ($processResponse === null) {
return $response;
}

return $processResponse($response);
},
fn () => $this->sqlLogger?->stopQuery($id),
);
/** @var Future<Payload> $future */
$future = async(function () use ($request, $sql): Payload {
$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);

try {

Check warning on line 214 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "UnwrapFinally": @@ @@ $future = async(function () use ($request, $sql): Payload { $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); - - try { - $response = $this->client->request($this->toAmpRequest($request)); - - if ($response->getStatus() !== 200) { - throw ServerError::fromResponseContent( - $response->getBody()->buffer(), - $response->getStatus(), - ); - } - - return $response->getBody(); - } finally { - $this->sqlLogger?->stopQuery($id); + $response = $this->client->request($this->toAmpRequest($request)); + if ($response->getStatus() !== 200) { + throw ServerError::fromResponseContent( + $response->getBody()->buffer(), + $response->getStatus(), + ); } + return $response->getBody(); + $this->sqlLogger?->stopQuery($id); }); return $future;
$response = $this->client->request($this->toAmpRequest($request));

if ($response->getStatus() !== 200) {
throw ServerError::fromResponseContent(
$response->getBody()->buffer(),
$response->getStatus(),
);
}

return $response->getBody();
} finally {
$this->sqlLogger?->stopQuery($id);
}
});

return $future;
}

/** @throws Error */
private function toAmpRequest(RequestInterface $request): AmpRequest
{
$ampRequest = $this->psrAdapter->fromPsrRequest($request);

foreach ($this->defaultHeaders as $name => $values) {
$ampRequest->setHeader($name, $values);
}

// ClickHouse queries may be long-running and can stream large result sets.
$ampRequest->setTransferTimeout(0);

Check warning on line 243 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "DecrementInteger": @@ @@ } // ClickHouse queries may be long-running and can stream large result sets. - $ampRequest->setTransferTimeout(0); + $ampRequest->setTransferTimeout(-1); $ampRequest->setInactivityTimeout(0); $ampRequest->setBodySizeLimit(0);
$ampRequest->setInactivityTimeout(0);

Check warning on line 244 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "DecrementInteger": @@ @@ // ClickHouse queries may be long-running and can stream large result sets. $ampRequest->setTransferTimeout(0); - $ampRequest->setInactivityTimeout(0); + $ampRequest->setInactivityTimeout(-1); $ampRequest->setBodySizeLimit(0); return $ampRequest;
$ampRequest->setBodySizeLimit(0);

return $ampRequest;
}
}
7 changes: 5 additions & 2 deletions src/Exception/ServerError.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ private function __construct(

public static function fromResponse(ResponseInterface $response): self
{
$bodyContent = $response->getBody()->__toString();
return self::fromResponseContent($response->getBody()->__toString(), $response->getStatusCode());
}

public static function fromResponseContent(string $bodyContent, int $httpStatusCode): self
{
$errorCode = preg_match('~(?:^|\R)Code: (\d+)\. DB::Exception:~', $bodyContent, $codeMatches) === 1
? (int) $codeMatches[1]
: 0;
Expand All @@ -36,7 +39,7 @@ public static function fromResponse(ResponseInterface $response): self
return new self(
$bodyContent,
$errorCode,
$response->getStatusCode(),
$httpStatusCode,
$exceptionName,
);
}
Expand Down
Loading
Loading