Skip to content
This repository was archived by the owner on Sep 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,56 @@ public function getJobResults(string $queryJobId, string $statementId): array
return $this->sendRequest('GET', $url);
}

/**
* Execute a workspace query and wait for results
*
* @param array{statements: string[], transactional?: bool} $requestBody
* @return array<string, mixed>
*/
public function executeWorkspaceQuery(string $branchId, string $workspaceId, array $requestBody): array
{
// Submit the query job
$response = $this->submitQueryJob($branchId, $workspaceId, $requestBody);

if (!isset($response['queryJobId']) || !is_string($response['queryJobId'])) {
throw new ClientException('Invalid response from submitQueryJob: missing queryJobId');
}

$queryJobId = $response['queryJobId'];

// Wait for job completion
$finalStatus = $this->waitForJobCompletion($queryJobId);

if (!isset($finalStatus['status']) || $finalStatus['status'] !== 'completed') {
/** @var string $status */
$status = $finalStatus['status'] ?? 'unknown';
throw new ClientException(
sprintf('Query job failed with status: %s', $status),
$status === 'failed' ? 500 : 0,
);
}

// Get results for all completed statements
$results = [];
if (isset($finalStatus['statements']) && is_array($finalStatus['statements'])) {
foreach ($finalStatus['statements'] as $statement) {
if (is_array($statement) && isset($statement['id']) && isset($statement['status'])) {
if ($statement['status'] === 'completed') {
$statementResults = $this->getJobResults($queryJobId, $statement['id']);
$results[] = $statementResults;
}
}
}
}

return [
'queryJobId' => $queryJobId,
'status' => $finalStatus['status'],
'statements' => $finalStatus['statements'] ?? [],
'results' => $results,
];
}

/**
* Health check
*
Expand Down Expand Up @@ -269,4 +319,29 @@ private function handleGuzzleException(GuzzleException $e): void

throw new ClientException('Query Service API request failed: ' . $e->getMessage(), 0, $e);
}

/**
* Wait for job completion with timeout
*
* @param int $maxWaitSeconds Maximum time to wait in seconds
* @return array<string, mixed>
*/
public function waitForJobCompletion(string $queryJobId, int $maxWaitSeconds = 30): array
{
$startTime = time();

while (time() - $startTime < $maxWaitSeconds) {
$status = $this->getJobStatus($queryJobId);

if (in_array($status['status'], ['completed', 'failed', 'canceled'], true)) {
return $status;
}

sleep(1);
}

throw new ClientException(
sprintf('Job %s did not complete within %d seconds', $queryJobId, $maxWaitSeconds),
);
}
}
24 changes: 1 addition & 23 deletions tests/Functional/BaseFunctionalTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -158,30 +158,8 @@ protected function createTestTable(?string $tableName = null): string

// Wait for completion
assert(is_string($response['queryJobId']));
$this->waitForJobCompletion($response['queryJobId']);
$this->queryClient->waitForJobCompletion($response['queryJobId']);

return $tableName;
}

/**
* @return array<string, mixed>
*/
protected function waitForJobCompletion(string $queryJobId, int $maxWaitSeconds = 30): array
{
$startTime = time();
$attempt = 1;

while (time() - $startTime < $maxWaitSeconds) {
$status = $this->queryClient->getJobStatus($queryJobId);
if (in_array($status['status'], ['completed', 'failed', 'canceled'], true)) {
return $status;
}
sleep(1);
$attempt++;
}

throw new RuntimeException(
sprintf('Job %s did not complete within %d seconds', $queryJobId, $maxWaitSeconds),
);
}
}
59 changes: 57 additions & 2 deletions tests/Functional/BasicQueryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function testSubmitSimpleSelectQuery(): void
$this->assertNotEmpty($queryJobId);

// Wait for job completion
$finalStatus = $this->waitForJobCompletion($queryJobId);
$finalStatus = $this->queryClient->waitForJobCompletion($queryJobId);

$this->assertEquals('completed', $finalStatus['status']);
$this->assertEquals($queryJobId, $finalStatus['queryJobId']);
Expand Down Expand Up @@ -80,7 +80,7 @@ public function testSubmitInformationSchemaQuery(): void
$this->assertNotEmpty($queryJobId);

// Wait for job completion
$finalStatus = $this->waitForJobCompletion($queryJobId);
$finalStatus = $this->queryClient->waitForJobCompletion($queryJobId);

$this->assertEquals('completed', $finalStatus['status']);
$this->assertEquals($queryJobId, $finalStatus['queryJobId']);
Expand Down Expand Up @@ -113,4 +113,59 @@ public function testSubmitInformationSchemaQuery(): void
$this->assertIsNumeric($row[0]);
$this->assertGreaterThanOrEqual(0, (int) $row[0]);
}

public function testExecuteWorkspaceQuery(): void
{
// Test the new executeWorkspaceQuery method with a simple query
$response = $this->queryClient->executeWorkspaceQuery(
$this->getTestBranchId(),
$this->getTestWorkspaceId(),
[
'statements' => ['SELECT CURRENT_TIMESTAMP() AS "current_time"'],
'transactional' => false,
],
);

// Verify the response structure
$this->assertArrayHasKey('queryJobId', $response);
$this->assertArrayHasKey('status', $response);
$this->assertArrayHasKey('statements', $response);
$this->assertArrayHasKey('results', $response);

// Verify job completed successfully
$this->assertEquals('completed', $response['status']);
$this->assertNotEmpty($response['queryJobId']);

// Verify statements
$statements = $response['statements'];
assert(is_array($statements));
$this->assertCount(1, $statements);

$statement = $statements[0];
assert(is_array($statement));
$this->assertEquals('completed', $statement['status']);

// Verify results
$results = $response['results'];
assert(is_array($results));
$this->assertCount(1, $results);

$result = $results[0];
assert(is_array($result));
$this->assertEquals('completed', $result['status']);

// Verify we got timestamp data
$this->assertArrayHasKey('data', $result);
$data = $result['data'];
assert(is_array($data));
$this->assertCount(1, $data);
$row = $data[0];
assert(is_array($row));
$this->assertCount(1, $row);
// Query API returns indexed arrays, not associative arrays with column names
assert(isset($row[0]) && is_string($row[0]));
$this->assertNotEmpty($row[0]);
// Verify it's a valid timestamp (numeric string)
$this->assertMatchesRegularExpression('/^\d+\.\d+$/', $row[0]);
}
}
10 changes: 5 additions & 5 deletions tests/Functional/QueryServiceFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public function testSubmitAndGetSimpleQuery(): void
$this->assertNotEmpty($queryJobId);

// Wait for job completion
$finalStatus = $this->waitForJobCompletion($queryJobId);
$finalStatus = $this->queryClient->waitForJobCompletion($queryJobId);

$this->assertEquals('completed', $finalStatus['status']);
$this->assertEquals($queryJobId, $finalStatus['queryJobId']);
Expand Down Expand Up @@ -97,7 +97,7 @@ public function testSubmitTransactionalQuery(): void
assert(is_string($queryJobId));

// Wait for completion
$finalStatus = $this->waitForJobCompletion($queryJobId);
$finalStatus = $this->queryClient->waitForJobCompletion($queryJobId);

$this->assertEquals('completed', $finalStatus['status']);
$this->assertArrayHasKey('statements', $finalStatus);
Expand Down Expand Up @@ -161,7 +161,7 @@ public function testCancelQueryJob(): void
$this->assertEquals($queryJobId, $cancelResponse['queryJobId']);

// Wait for final status
$finalStatus = $this->waitForJobCompletion($queryJobId, 15);
$finalStatus = $this->queryClient->waitForJobCompletion($queryJobId, 15);

// Job should be canceled
$this->assertEquals('canceled', $finalStatus['status']);
Expand Down Expand Up @@ -193,7 +193,7 @@ public function testQueryJobWithInvalidSQL(): void
assert(is_string($queryJobId));

// Wait for job completion
$finalStatus = $this->waitForJobCompletion($queryJobId);
$finalStatus = $this->queryClient->waitForJobCompletion($queryJobId);

// Job should fail due to invalid SQL
$this->assertEquals('failed', $finalStatus['status']);
Expand Down Expand Up @@ -241,7 +241,7 @@ public function testQueryJobWithInvalidBranch(): void
assert(is_string($queryJobId));

// Wait for job completion
$finalStatus = $this->waitForJobCompletion($queryJobId);
$finalStatus = $this->queryClient->waitForJobCompletion($queryJobId);

// Query Service accepts invalid branch IDs and executes successfully
$this->assertEquals('completed', $finalStatus['status']);
Expand Down