Skip to content

Commit 7ba8809

Browse files
committed
Add Amazon SQS transport for BabelQueue SDK (v1.1.0)
1 parent aa6c0c0 commit 7ba8809

7 files changed

Lines changed: 368 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
The envelope wire format is versioned separately by `meta.schema_version`
88
(currently **1**).
99

10-
## [Unreleased]
10+
## [1.1.0] - 2026-06-12
11+
12+
### Added
13+
- **Amazon SQS transport**`BabelQueue\Transport\SqsTransport`, a framework-less SQS
14+
producer implementing [§3 of the broker-bindings contract](https://babelqueue.com): it
15+
sends the canonical envelope as the `MessageBody` and projects the envelope onto native
16+
SQS `MessageAttributes` (`bq-job`/`bq-trace-id`/`bq-message-id`/`bq-schema-version`/
17+
`bq-source-lang`/`bq-created-at`; FIFO `MessageGroupId`/`MessageDeduplicationId`). It is
18+
decoupled from `aws/aws-sdk-php` behind a one-method `BabelQueue\Transport\SqsClient`
19+
seam, so it is dependency-free and unit-testable with a fake (wrap a real
20+
`Aws\Sqs\SqsClient` in a one-line adapter — see the class docs). The envelope is
21+
unchanged (`schema_version: 1`); SQS is purely additive.
1122

1223
## [1.0.0] - 2026-06-07
1324

@@ -82,7 +93,8 @@ contract live at [babelqueue.com](https://babelqueue.com).
8293
- Framework-agnostic core. Requires PHP `^8.2` and `ext-json` only — no heavy deps.
8394
- Framework adapters (`babelqueue/laravel`, `babelqueue/symfony`) build on this.
8495

85-
[Unreleased]: https://github.com/BabelQueue/php-sdk/compare/v1.0.0...HEAD
96+
[Unreleased]: https://github.com/BabelQueue/php-sdk/compare/v1.1.0...HEAD
97+
[1.1.0]: https://github.com/BabelQueue/php-sdk/compare/v1.0.0...v1.1.0
8698
[1.0.0]: https://github.com/BabelQueue/php-sdk/compare/v0.3.0...v1.0.0
8799
[0.3.0]: https://github.com/BabelQueue/php-sdk/compare/v0.2.0...v0.3.0
88100
[0.2.0]: https://github.com/BabelQueue/php-sdk/compare/v0.1.0...v0.2.0

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
"suggest": {
3636
"ext-redis": "Use the phpredis extension with your own one-method Transport, or install predis/predis for the bundled RedisTransport.",
3737
"php-amqplib/php-amqplib": "For the framework-less RabbitMQ transport (BabelQueue\\Transport\\AmqpTransport).",
38-
"predis/predis": "Pure-PHP Redis client for the framework-less Redis transport (BabelQueue\\Transport\\RedisTransport)."
38+
"predis/predis": "Pure-PHP Redis client for the framework-less Redis transport (BabelQueue\\Transport\\RedisTransport).",
39+
"aws/aws-sdk-php": "For the framework-less Amazon SQS transport (BabelQueue\\Transport\\SqsTransport)."
3940
},
4041
"autoload": {
4142
"psr-4": {

src/Transport/SqsClient.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace BabelQueue\Transport;
6+
7+
/**
8+
* The single SQS operation the framework-less {@see SqsTransport} needs: send a
9+
* message. The AWS SDK's {@code Aws\Sqs\SqsClient} exposes a matching
10+
* {@code sendMessage()} method, so wrap it in a one-line adapter (see the
11+
* {@see SqsTransport} class docs). Keeping the transport behind this tiny seam
12+
* leaves the core dependency-free and unit-testable with a fake.
13+
*/
14+
interface SqsClient
15+
{
16+
/**
17+
* @param array<string, mixed> $args SQS SendMessage parameters.
18+
* @return mixed The AWS result (ignored by the transport).
19+
*/
20+
public function sendMessage(array $args): mixed;
21+
}

src/Transport/SqsTransport.php

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace BabelQueue\Transport;
6+
7+
use BabelQueue\Codec\EnvelopeCodec;
8+
use BabelQueue\Contracts\Transport;
9+
10+
/**
11+
* A framework-less Amazon SQS producer: sends the canonical envelope as the message
12+
* body and projects the envelope onto the native SQS MessageAttributes every
13+
* BabelQueue SDK agrees on, so a non-PHP consumer can route on `bq-job` and trace on
14+
* `bq-trace-id` *without decoding the body first*:
15+
*
16+
* - `bq-job` ← the job URN
17+
* - `bq-trace-id` ← trace_id
18+
* - `bq-message-id` ← meta.id
19+
* - `bq-schema-version` / `bq-source-lang` / `bq-created-at`
20+
*
21+
* Implements §3 of the broker-bindings contract. The envelope is unchanged
22+
* (`schema_version` stays 1); SQS is purely additive.
23+
*
24+
* Optional dependency: `aws/aws-sdk-php`. Wrap your client in one line:
25+
*
26+
* ```php
27+
* $sqs = new \Aws\Sqs\SqsClient([...]);
28+
* $transport = new SqsTransport(
29+
* new class ($sqs) implements SqsClient {
30+
* public function __construct(private \Aws\Sqs\SqsClient $c) {}
31+
* public function sendMessage(array $args): mixed { return $this->c->sendMessage($args); }
32+
* },
33+
* 'https://sqs.eu-central-1.amazonaws.com/123456789012/orders',
34+
* );
35+
* ```
36+
*/
37+
final class SqsTransport implements Transport
38+
{
39+
public function __construct(
40+
private readonly SqsClient $client,
41+
private readonly string $queueUrl,
42+
private readonly bool $fifo = false,
43+
private readonly ?string $messageGroupId = null,
44+
private readonly bool $contentDedup = false,
45+
) {
46+
}
47+
48+
public function publish(string $payload, ?string $queue = null): ?string
49+
{
50+
$url = $queue ?? $this->queueUrl;
51+
$envelope = EnvelopeCodec::decode($payload);
52+
$meta = is_array($envelope['meta'] ?? null) ? $envelope['meta'] : [];
53+
54+
$args = [
55+
'QueueUrl' => $url,
56+
'MessageBody' => $payload,
57+
'MessageAttributes' => $this->attributes($envelope, $meta),
58+
];
59+
60+
if ($this->fifo) {
61+
$args['MessageGroupId'] = $this->messageGroupId ?? $this->queueName($url);
62+
if (! $this->contentDedup && isset($meta['id']) && is_scalar($meta['id'])) {
63+
$args['MessageDeduplicationId'] = (string) $meta['id'];
64+
}
65+
}
66+
67+
$this->client->sendMessage($args);
68+
69+
$id = $meta['id'] ?? null;
70+
71+
return is_scalar($id) ? (string) $id : null;
72+
}
73+
74+
/**
75+
* @param array<string, mixed> $envelope
76+
* @param array<string, mixed> $meta
77+
* @return array<string, array{DataType: string, StringValue: string}>
78+
*/
79+
private function attributes(array $envelope, array $meta): array
80+
{
81+
$attributes = [];
82+
83+
$urn = EnvelopeCodec::urn($envelope);
84+
if ($urn !== '') {
85+
$attributes['bq-job'] = self::string($urn);
86+
}
87+
88+
$traceId = $envelope['trace_id'] ?? null;
89+
if (is_string($traceId) && $traceId !== '') {
90+
$attributes['bq-trace-id'] = self::string($traceId);
91+
}
92+
93+
if (isset($meta['id']) && is_scalar($meta['id'])) {
94+
$attributes['bq-message-id'] = self::string((string) $meta['id']);
95+
}
96+
if (isset($meta['schema_version']) && is_scalar($meta['schema_version'])) {
97+
$attributes['bq-schema-version'] = self::number((string) $meta['schema_version']);
98+
}
99+
if (isset($meta['lang']) && is_string($meta['lang']) && $meta['lang'] !== '') {
100+
$attributes['bq-source-lang'] = self::string($meta['lang']);
101+
}
102+
if (isset($meta['created_at']) && is_scalar($meta['created_at'])) {
103+
$attributes['bq-created-at'] = self::number((string) $meta['created_at']);
104+
}
105+
106+
return $attributes;
107+
}
108+
109+
/**
110+
* @return array{DataType: string, StringValue: string}
111+
*/
112+
private static function string(string $value): array
113+
{
114+
return ['DataType' => 'String', 'StringValue' => $value];
115+
}
116+
117+
/**
118+
* @return array{DataType: string, StringValue: string}
119+
*/
120+
private static function number(string $value): array
121+
{
122+
return ['DataType' => 'Number', 'StringValue' => $value];
123+
}
124+
125+
private function queueName(string $url): string
126+
{
127+
$segments = array_values(array_filter(explode('/', $url), static fn (string $s): bool => $s !== ''));
128+
129+
return $segments === [] ? 'default' : (string) end($segments);
130+
}
131+
}

tests/SqsConformanceTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace BabelQueue\Tests;
6+
7+
use BabelQueue\Transport\SqsClient;
8+
use BabelQueue\Transport\SqsTransport;
9+
use Mockery;
10+
use Mockery\Adapter\Phpunit\MockeryPHPUnitIntegration;
11+
use PHPUnit\Framework\TestCase;
12+
13+
/**
14+
* Amazon SQS binding conformance against the vendored canonical suite's `sqs` block.
15+
*
16+
* The `php-sdk` ships the produce-side `SqsTransport`, so it satisfies
17+
* `attribute_projection`. The consume-side `attempts_reconciliation` is exercised by
18+
* the framework-less runtime SDKs (Go/Python/Node/Java/.NET); the Laravel drop-in
19+
* driver surfaces the broker's native count instead (exempt per the manifest note).
20+
*/
21+
final class SqsConformanceTest extends TestCase
22+
{
23+
use MockeryPHPUnitIntegration;
24+
25+
public function test_attribute_projection_matches_the_golden(): void
26+
{
27+
$manifest = json_decode(
28+
(string) file_get_contents(__DIR__ . '/conformance/manifest.json'),
29+
true,
30+
512,
31+
JSON_THROW_ON_ERROR,
32+
);
33+
$projection = $manifest['sqs']['attribute_projection'];
34+
$body = (string) file_get_contents(__DIR__ . '/conformance/' . $projection['envelope_file']);
35+
36+
$captured = null;
37+
$client = Mockery::mock(SqsClient::class);
38+
$client->shouldReceive('sendMessage')->once()->with(
39+
Mockery::on(function (array $args) use (&$captured): bool {
40+
$captured = $args;
41+
42+
return true;
43+
}),
44+
);
45+
46+
(new SqsTransport($client, 'https://sqs.eu-central-1.amazonaws.com/123456789012/orders'))->publish($body);
47+
48+
$attributes = $captured['MessageAttributes'];
49+
$want = $projection['message_attributes'];
50+
51+
$this->assertCount(count($want), $attributes);
52+
foreach ($want as $key => $expected) {
53+
$this->assertArrayHasKey($key, $attributes, $key);
54+
$this->assertSame($expected['DataType'], $attributes[$key]['DataType'], $key);
55+
$this->assertSame($expected['StringValue'], $attributes[$key]['StringValue'], $key);
56+
}
57+
}
58+
}

tests/SqsTransportTest.php

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace BabelQueue\Tests;
6+
7+
use BabelQueue\Transport\SqsClient;
8+
use BabelQueue\Transport\SqsTransport;
9+
use Mockery;
10+
use Mockery\Adapter\Phpunit\MockeryPHPUnitIntegration;
11+
use PHPUnit\Framework\TestCase;
12+
13+
/**
14+
* The framework-less Amazon SQS producer: the canonical envelope as the message
15+
* body, projected onto native SQS MessageAttributes (bq-job/bq-trace-id/... + the
16+
* Number-typed schema-version/created-at) so a non-PHP consumer can route and trace
17+
* without decoding the body.
18+
*/
19+
final class SqsTransportTest extends TestCase
20+
{
21+
use MockeryPHPUnitIntegration;
22+
23+
private const URL = 'https://sqs.eu-central-1.amazonaws.com/123456789012/orders';
24+
25+
private const ENVELOPE = '{"job":"urn:babel:orders:created","trace_id":"trace-1",'
26+
.'"data":{"order_id":1042},"meta":{"id":"msg-1","queue":"orders","lang":"php",'
27+
.'"schema_version":1,"created_at":1749132727000},"attempts":0}';
28+
29+
public function test_publish_projects_contract_attributes(): void
30+
{
31+
$captured = null;
32+
33+
$client = Mockery::mock(SqsClient::class);
34+
$client->shouldReceive('sendMessage')->once()->with(
35+
Mockery::on(function (array $args) use (&$captured): bool {
36+
$captured = $args;
37+
38+
return true;
39+
}),
40+
);
41+
42+
$id = (new SqsTransport($client, self::URL))->publish(self::ENVELOPE);
43+
44+
$this->assertSame('msg-1', $id);
45+
$this->assertSame(self::URL, $captured['QueueUrl']);
46+
$this->assertSame(self::ENVELOPE, $captured['MessageBody']);
47+
48+
$attrs = $captured['MessageAttributes'];
49+
$this->assertSame(['DataType' => 'String', 'StringValue' => 'urn:babel:orders:created'], $attrs['bq-job']);
50+
$this->assertSame(['DataType' => 'String', 'StringValue' => 'trace-1'], $attrs['bq-trace-id']);
51+
$this->assertSame(['DataType' => 'String', 'StringValue' => 'msg-1'], $attrs['bq-message-id']);
52+
$this->assertSame(['DataType' => 'Number', 'StringValue' => '1'], $attrs['bq-schema-version']);
53+
$this->assertSame(['DataType' => 'String', 'StringValue' => 'php'], $attrs['bq-source-lang']);
54+
$this->assertSame(['DataType' => 'Number', 'StringValue' => '1749132727000'], $attrs['bq-created-at']);
55+
$this->assertArrayNotHasKey('MessageGroupId', $captured);
56+
}
57+
58+
public function test_publish_uses_the_queue_override_url(): void
59+
{
60+
$other = 'https://sqs.eu-central-1.amazonaws.com/123456789012/billing';
61+
$captured = null;
62+
63+
$client = Mockery::mock(SqsClient::class);
64+
$client->shouldReceive('sendMessage')->once()->with(
65+
Mockery::on(function (array $args) use (&$captured): bool {
66+
$captured = $args;
67+
68+
return true;
69+
}),
70+
);
71+
72+
(new SqsTransport($client, self::URL))->publish(self::ENVELOPE, $other);
73+
74+
$this->assertSame($other, $captured['QueueUrl']);
75+
}
76+
77+
public function test_fifo_sets_group_and_dedup(): void
78+
{
79+
$captured = null;
80+
81+
$client = Mockery::mock(SqsClient::class);
82+
$client->shouldReceive('sendMessage')->once()->with(
83+
Mockery::on(function (array $args) use (&$captured): bool {
84+
$captured = $args;
85+
86+
return true;
87+
}),
88+
);
89+
90+
(new SqsTransport($client, self::URL.'.fifo', fifo: true))->publish(self::ENVELOPE);
91+
92+
$this->assertSame('orders.fifo', $captured['MessageGroupId']);
93+
$this->assertSame('msg-1', $captured['MessageDeduplicationId']);
94+
}
95+
96+
public function test_content_dedup_omits_dedup_id(): void
97+
{
98+
$captured = null;
99+
100+
$client = Mockery::mock(SqsClient::class);
101+
$client->shouldReceive('sendMessage')->once()->with(
102+
Mockery::on(function (array $args) use (&$captured): bool {
103+
$captured = $args;
104+
105+
return true;
106+
}),
107+
);
108+
109+
(new SqsTransport($client, self::URL.'.fifo', fifo: true, messageGroupId: 'grp', contentDedup: true))
110+
->publish(self::ENVELOPE);
111+
112+
$this->assertSame('grp', $captured['MessageGroupId']);
113+
$this->assertArrayNotHasKey('MessageDeduplicationId', $captured);
114+
}
115+
}

0 commit comments

Comments
 (0)