Skip to content

Commit 29e2213

Browse files
authored
Merge pull request #6354 from KennethRuan/kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch
2 parents 526c821 + dda70b4 commit 29e2213

7 files changed

Lines changed: 404 additions & 9 deletions

File tree

src/workerd/api/queue.c++

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,74 @@ kj::Promise<void> WorkerQueue::send(
243243
.attach(context.registerPendingEvent());
244244
};
245245

246+
jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock& js,
247+
jsg::JsValue body,
248+
jsg::Optional<SendOptions> options,
249+
const jsg::TypeHandler<SendResponse>& responseHandler) {
250+
auto& context = IoContext::current();
251+
252+
JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");
253+
254+
auto headers = kj::HttpHeaders(context.getHeaderTable());
255+
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString());
256+
257+
kj::Maybe<kj::StringPtr> contentType;
258+
KJ_IF_SOME(opts, options) {
259+
KJ_IF_SOME(type, opts.contentType) {
260+
auto validatedType = validateContentType(type);
261+
headers.addPtrPtr(HDR_MSG_FORMAT, validatedType);
262+
contentType = validatedType;
263+
}
264+
KJ_IF_SOME(secs, opts.delaySeconds) {
265+
headers.addPtr(HDR_MSG_DELAY, kj::str(secs));
266+
}
267+
}
268+
269+
Serialized serialized;
270+
KJ_IF_SOME(type, contentType) {
271+
serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY);
272+
} else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) {
273+
headers.addPtrPtr("X-Msg-Fmt", IncomingQueueMessage::ContentType::JSON);
274+
serialized = serialize(
275+
js, body, IncomingQueueMessage::ContentType::JSON, SerializeArrayBufferBehavior::DEEP_COPY);
276+
} else {
277+
serialized = serializeV8(js, body);
278+
}
279+
280+
auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
281+
auto req = client->request(
282+
kj::HttpMethod::POST, "https://fake-host/message"_kjc, headers, serialized.data.size());
283+
284+
const auto& headerIds = context.getHeaderIds();
285+
const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes();
286+
287+
static constexpr auto handleSend = [](auto req, auto serialized, auto client, auto& headerIds,
288+
bool exposeErrorCodes) -> kj::Promise<kj::String> {
289+
co_await req.body->write(serialized.data);
290+
auto response = co_await req.response;
291+
292+
if (exposeErrorCodes) {
293+
JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds));
294+
} else {
295+
JSG_REQUIRE(
296+
response.statusCode == 200, Error, kj::str("Queue send failed: ", response.statusText));
297+
}
298+
299+
auto responseBody = co_await response.body->readAllBytes();
300+
co_return kj::str(responseBody.asChars());
301+
};
302+
303+
auto promise =
304+
handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes);
305+
306+
return context.awaitIo(
307+
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
308+
auto parsed = jsg::JsValue::fromJson(js, text);
309+
return JSG_REQUIRE_NONNULL(
310+
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
311+
});
312+
}
313+
246314
kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
247315
jsg::Sequence<MessageSendRequest> batch,
248316
jsg::Optional<SendBatchOptions> options) {
@@ -394,6 +462,120 @@ jsg::Promise<WorkerQueue::Metrics> WorkerQueue::metrics(
394462
});
395463
}
396464

465+
jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(jsg::Lock& js,
466+
jsg::Sequence<MessageSendRequest> batch,
467+
jsg::Optional<SendBatchOptions> options,
468+
const jsg::TypeHandler<SendBatchResponse>& responseHandler) {
469+
auto& context = IoContext::current();
470+
471+
JSG_REQUIRE(batch.size() > 0, TypeError, "sendBatch() requires at least one message");
472+
473+
size_t totalSize = 0;
474+
size_t largestMessage = 0;
475+
auto messageCount = batch.size();
476+
auto builder = kj::heapArrayBuilder<SerializedWithOptions>(messageCount);
477+
for (auto& message: batch) {
478+
auto body = message.body.getHandle(js);
479+
JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");
480+
481+
SerializedWithOptions item;
482+
KJ_IF_SOME(secs, message.delaySeconds) {
483+
item.delaySeconds = secs;
484+
}
485+
486+
KJ_IF_SOME(contentType, message.contentType) {
487+
item.contentType = validateContentType(contentType);
488+
item.body = serialize(js, body, contentType, SerializeArrayBufferBehavior::SHALLOW_REFERENCE);
489+
} else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) {
490+
item.contentType = IncomingQueueMessage::ContentType::JSON;
491+
item.body = serialize(js, body, IncomingQueueMessage::ContentType::JSON,
492+
SerializeArrayBufferBehavior::SHALLOW_REFERENCE);
493+
} else {
494+
item.body = serializeV8(js, body);
495+
}
496+
497+
builder.add(kj::mv(item));
498+
totalSize += builder.back().body.data.size();
499+
largestMessage = kj::max(largestMessage, builder.back().body.data.size());
500+
}
501+
auto serializedBodies = builder.finish();
502+
503+
auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 64 + 32;
504+
kj::Vector<char> bodyBuilder(estimatedSize);
505+
bodyBuilder.addAll("{\"messages\":["_kj);
506+
for (size_t i = 0; i < messageCount; ++i) {
507+
bodyBuilder.addAll("{\"body\":\""_kj);
508+
bodyBuilder.addAll(kj::encodeBase64(serializedBodies[i].body.data));
509+
bodyBuilder.add('"');
510+
511+
KJ_IF_SOME(contentType, serializedBodies[i].contentType) {
512+
bodyBuilder.addAll(",\"contentType\":\""_kj);
513+
bodyBuilder.addAll(contentType);
514+
bodyBuilder.add('"');
515+
}
516+
517+
KJ_IF_SOME(delaySecs, serializedBodies[i].delaySeconds) {
518+
bodyBuilder.addAll(",\"delaySecs\": "_kj);
519+
bodyBuilder.addAll(kj::str(delaySecs));
520+
}
521+
522+
bodyBuilder.addAll("}"_kj);
523+
if (i < messageCount - 1) {
524+
bodyBuilder.add(',');
525+
}
526+
}
527+
bodyBuilder.addAll("]}"_kj);
528+
bodyBuilder.add('\0');
529+
KJ_DASSERT(bodyBuilder.size() <= estimatedSize);
530+
kj::String body(bodyBuilder.releaseAsArray());
531+
KJ_DASSERT(jsg::JsValue::fromJson(js, body).isObject());
532+
533+
auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
534+
535+
auto headers = kj::HttpHeaders(context.getHeaderTable());
536+
headers.addPtr("CF-Queue-Batch-Count"_kj, kj::str(messageCount));
537+
headers.addPtr("CF-Queue-Batch-Bytes"_kj, kj::str(totalSize));
538+
headers.addPtr("CF-Queue-Largest-Msg"_kj, kj::str(largestMessage));
539+
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::JSON.toString());
540+
541+
KJ_IF_SOME(opts, options) {
542+
KJ_IF_SOME(secs, opts.delaySeconds) {
543+
headers.addPtr(HDR_MSG_DELAY, kj::str(secs));
544+
}
545+
}
546+
547+
auto req =
548+
client->request(kj::HttpMethod::POST, "https://fake-host/batch"_kjc, headers, body.size());
549+
550+
const auto& headerIds = context.getHeaderIds();
551+
const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes();
552+
static constexpr auto handleWrite = [](auto req, auto body, auto client, auto& headerIds,
553+
bool exposeErrorCodes) -> kj::Promise<kj::String> {
554+
co_await req.body->write(body.asBytes());
555+
auto response = co_await req.response;
556+
557+
if (exposeErrorCodes) {
558+
JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds));
559+
} else {
560+
JSG_REQUIRE(response.statusCode == 200, Error,
561+
kj::str("Queue sendBatch failed: ", response.statusText));
562+
}
563+
564+
auto responseBody = co_await response.body->readAllBytes();
565+
co_return kj::str(responseBody.asChars());
566+
};
567+
568+
auto promise =
569+
handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes);
570+
571+
return context.awaitIo(
572+
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse {
573+
auto parsed = jsg::JsValue::fromJson(js, text);
574+
return JSG_REQUIRE_NONNULL(
575+
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
576+
});
577+
}
578+
397579
QueueMessage::QueueMessage(
398580
jsg::Lock& js, rpc::QueueMessage::Reader message, IoPtr<QueueEventResult> result)
399581
: id(kj::str(message.getId())),

src/workerd/api/queue.h

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,46 @@ class WorkerQueue: public jsg::Object {
3535
JSG_STRUCT_TS_OVERRIDE(QueueMetrics);
3636
};
3737

38+
struct SendMetrics {
39+
double backlogCount;
40+
double backlogBytes;
41+
double oldestMessageTimestamp;
42+
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
43+
JSG_STRUCT_TS_OVERRIDE(QueueSendMetrics);
44+
};
45+
46+
struct SendMetadata {
47+
SendMetrics metrics;
48+
JSG_STRUCT(metrics);
49+
JSG_STRUCT_TS_OVERRIDE(QueueSendMetadata);
50+
};
51+
52+
struct SendResponse {
53+
SendMetadata metadata;
54+
JSG_STRUCT(metadata);
55+
JSG_STRUCT_TS_OVERRIDE(QueueSendResponse);
56+
};
57+
58+
struct SendBatchMetrics {
59+
double backlogCount;
60+
double backlogBytes;
61+
double oldestMessageTimestamp;
62+
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
63+
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetrics);
64+
};
65+
66+
struct SendBatchMetadata {
67+
SendBatchMetrics metrics;
68+
JSG_STRUCT(metrics);
69+
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetadata);
70+
};
71+
72+
struct SendBatchResponse {
73+
SendBatchMetadata metadata;
74+
JSG_STRUCT(metadata);
75+
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchResponse);
76+
};
77+
3878
struct SendOptions {
3979
// TODO(soon): Support metadata.
4080

@@ -77,26 +117,39 @@ class WorkerQueue: public jsg::Object {
77117

78118
kj::Promise<void> send(jsg::Lock& js, jsg::JsValue body, jsg::Optional<SendOptions> options);
79119

120+
jsg::Promise<SendResponse> sendWithResponse(jsg::Lock& js,
121+
jsg::JsValue body,
122+
jsg::Optional<SendOptions> options,
123+
const jsg::TypeHandler<SendResponse>& responseHandler);
124+
80125
kj::Promise<void> sendBatch(jsg::Lock& js,
81126
jsg::Sequence<MessageSendRequest> batch,
82127
jsg::Optional<SendBatchOptions> options);
83128

129+
jsg::Promise<SendBatchResponse> sendBatchWithResponse(jsg::Lock& js,
130+
jsg::Sequence<MessageSendRequest> batch,
131+
jsg::Optional<SendBatchOptions> options,
132+
const jsg::TypeHandler<SendBatchResponse>& responseHandler);
133+
84134
jsg::Promise<Metrics> metrics(jsg::Lock& js, const jsg::TypeHandler<Metrics>& metricsHandler);
85135

86136
JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) {
87-
JSG_METHOD(send);
88-
JSG_METHOD(sendBatch);
89137
if (flags.getWorkerdExperimental()) {
138+
JSG_METHOD_NAMED(send, sendWithResponse);
139+
JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse);
90140
JSG_METHOD(metrics);
141+
} else {
142+
JSG_METHOD(send);
143+
JSG_METHOD(sendBatch);
91144
}
92145

93146
JSG_TS_ROOT();
94147
if (flags.getWorkerdExperimental()) {
95148
JSG_TS_OVERRIDE(Queue<Body = unknown> {
96-
send(message: Body, options?: QueueSendOptions): Promise<void>;
149+
send(message: Body, options?: QueueSendOptions): Promise<QueueSendResponse>;
97150
sendBatch(messages
98151
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
99-
: Promise<void>;
152+
: Promise<QueueSendBatchResponse>;
100153
metrics(): Promise<QueueMetrics>;
101154
});
102155
} else {
@@ -446,7 +499,10 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re
446499
};
447500

448501
#define EW_QUEUE_ISOLATE_TYPES \
449-
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
502+
api::WorkerQueue, api::WorkerQueue::SendMetrics, api::WorkerQueue::SendMetadata, \
503+
api::WorkerQueue::SendResponse, api::WorkerQueue::SendBatchMetrics, \
504+
api::WorkerQueue::SendBatchMetadata, api::WorkerQueue::SendBatchResponse, \
505+
api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
450506
api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::MessageBatchMetrics, \
451507
api::MessageBatchMetadata, api::IncomingQueueMessage, api::QueueRetryBatch, \
452508
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \

src/workerd/api/tests/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,14 @@ wd_test(
164164
data = ["queue-metadata-test.js"],
165165
)
166166

167+
wd_test(
168+
src = "queue-producer-metadata-test.wd-test",
169+
args = ["--experimental"],
170+
data = [
171+
"queue-producer-metadata-test.js",
172+
],
173+
)
174+
167175
wd_test(
168176
src = "r2-test.wd-test",
169177
args = ["--experimental"],
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright (c) 2023 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
import assert from 'node:assert';
6+
7+
const SEND_RESPONSE = {
8+
metadata: {
9+
metrics: {
10+
backlogCount: 100,
11+
backlogBytes: 2048,
12+
oldestMessageTimestamp: 1000000,
13+
},
14+
},
15+
};
16+
17+
const SEND_BATCH_RESPONSE = {
18+
metadata: {
19+
metrics: {
20+
backlogCount: 200,
21+
backlogBytes: 4096,
22+
oldestMessageTimestamp: 2000000,
23+
},
24+
},
25+
};
26+
27+
export default {
28+
async fetch(request) {
29+
const { pathname } = new URL(request.url);
30+
31+
if (pathname === '/message') {
32+
const text = await request.text();
33+
assert.strictEqual(request.method, 'POST');
34+
assert.strictEqual(text, 'abc');
35+
return Response.json(SEND_RESPONSE);
36+
}
37+
38+
if (pathname === '/batch') {
39+
assert.strictEqual(request.method, 'POST');
40+
const body = await request.json();
41+
assert.strictEqual(body.messages.length, 1);
42+
return Response.json(SEND_BATCH_RESPONSE);
43+
}
44+
45+
return new Response('Not Found', { status: 404 });
46+
},
47+
48+
async test(ctrl, env) {
49+
const responseBodyEnabled = env.RESPONSE_BODY_FLAG;
50+
51+
const sendResult = await env.QUEUE.send('abc', { contentType: 'text' });
52+
const sendBatchResult = await env.QUEUE.sendBatch([
53+
{ body: 'def', contentType: 'text' },
54+
]);
55+
56+
if (responseBodyEnabled) {
57+
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100);
58+
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048);
59+
assert.strictEqual(
60+
sendResult.metadata.metrics.oldestMessageTimestamp,
61+
1000000
62+
);
63+
64+
assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200);
65+
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096);
66+
assert.strictEqual(
67+
sendBatchResult.metadata.metrics.oldestMessageTimestamp,
68+
2000000
69+
);
70+
} else {
71+
assert.strictEqual(sendResult, undefined);
72+
assert.strictEqual(sendBatchResult, undefined);
73+
}
74+
},
75+
};

0 commit comments

Comments
 (0)