Skip to content

Commit a88a242

Browse files
committed
Add response body to queue send() and sendBatch()
1 parent 04c6588 commit a88a242

7 files changed

Lines changed: 425 additions & 14 deletions

File tree

src/workerd/api/queue.c++

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,76 @@ kj::Promise<void> WorkerQueue::send(
243243
.attach(context.registerPendingEvent());
244244
};
245245

246+
jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock& js,
247+
jsg::JsValue body,
248+
jsg::Optional<SendOptions> options,
249+
const jsg::TypeHandler<SendResponse>& responseHandler) {
250+
auto& context = IoContext::current();
251+
252+
JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");
253+
254+
auto headers = kj::HttpHeaders(context.getHeaderTable());
255+
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString());
256+
257+
kj::Maybe<kj::StringPtr> contentType;
258+
KJ_IF_SOME(opts, options) {
259+
KJ_IF_SOME(type, opts.contentType) {
260+
auto validatedType = validateContentType(type);
261+
headers.addPtrPtr(HDR_MSG_FORMAT, validatedType);
262+
contentType = validatedType;
263+
}
264+
KJ_IF_SOME(secs, opts.delaySeconds) {
265+
headers.addPtr(HDR_MSG_DELAY, kj::str(secs));
266+
}
267+
}
268+
269+
Serialized serialized;
270+
KJ_IF_SOME(type, contentType) {
271+
serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY);
272+
} else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) {
273+
headers.addPtrPtr("X-Msg-Fmt", IncomingQueueMessage::ContentType::JSON);
274+
serialized = serialize(
275+
js, body, IncomingQueueMessage::ContentType::JSON, SerializeArrayBufferBehavior::DEEP_COPY);
276+
} else {
277+
serialized = serializeV8(js, body);
278+
}
279+
280+
auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
281+
auto req = client->request(
282+
kj::HttpMethod::POST, "https://fake-host/message"_kjc, headers, serialized.data.size());
283+
284+
const auto& headerIds = context.getHeaderIds();
285+
const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes();
286+
287+
static constexpr auto handleSend = [](auto req, auto serialized, auto client, auto& headerIds,
288+
bool exposeErrorCodes) -> kj::Promise<kj::String> {
289+
co_await req.body->write(serialized.data);
290+
auto response = co_await req.response;
291+
292+
if (exposeErrorCodes) {
293+
JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds));
294+
} else {
295+
JSG_REQUIRE(
296+
response.statusCode == 200, Error, kj::str("Queue send failed: ", response.statusText));
297+
}
298+
299+
auto responseBody = co_await response.body->readAllBytes();
300+
co_return kj::str(responseBody.asChars());
301+
};
302+
303+
auto promise =
304+
handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes);
305+
306+
return context.awaitIo(
307+
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
308+
auto parsed = jsg::JsValue::fromJson(js, text);
309+
KJ_IF_SOME(result, responseHandler.tryUnwrap(js, parsed)) {
310+
return kj::mv(result);
311+
}
312+
_JSG_INTERNAL_FAIL_REQUIRE(JSG_EXCEPTION(Error), "Failed to parse queue send response", text);
313+
});
314+
}
315+
246316
kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
247317
jsg::Sequence<MessageSendRequest> batch,
248318
jsg::Optional<SendBatchOptions> options) {
@@ -362,6 +432,123 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
362432
.attach(context.registerPendingEvent());
363433
};
364434

435+
jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(jsg::Lock& js,
436+
jsg::Sequence<MessageSendRequest> batch,
437+
jsg::Optional<SendBatchOptions> options,
438+
const jsg::TypeHandler<SendBatchResponse>& responseHandler) {
439+
auto& context = IoContext::current();
440+
441+
JSG_REQUIRE(batch.size() > 0, TypeError, "sendBatch() requires at least one message");
442+
443+
size_t totalSize = 0;
444+
size_t largestMessage = 0;
445+
auto messageCount = batch.size();
446+
auto builder = kj::heapArrayBuilder<SerializedWithOptions>(messageCount);
447+
for (auto& message: batch) {
448+
auto body = message.body.getHandle(js);
449+
JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");
450+
451+
SerializedWithOptions item;
452+
KJ_IF_SOME(secs, message.delaySeconds) {
453+
item.delaySeconds = secs;
454+
}
455+
456+
KJ_IF_SOME(contentType, message.contentType) {
457+
item.contentType = validateContentType(contentType);
458+
item.body = serialize(js, body, contentType, SerializeArrayBufferBehavior::SHALLOW_REFERENCE);
459+
} else if (workerd::FeatureFlags::get(js).getQueuesJsonMessages()) {
460+
item.contentType = IncomingQueueMessage::ContentType::JSON;
461+
item.body = serialize(js, body, IncomingQueueMessage::ContentType::JSON,
462+
SerializeArrayBufferBehavior::SHALLOW_REFERENCE);
463+
} else {
464+
item.body = serializeV8(js, body);
465+
}
466+
467+
builder.add(kj::mv(item));
468+
totalSize += builder.back().body.data.size();
469+
largestMessage = kj::max(largestMessage, builder.back().body.data.size());
470+
}
471+
auto serializedBodies = builder.finish();
472+
473+
auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 64 + 32;
474+
kj::Vector<char> bodyBuilder(estimatedSize);
475+
bodyBuilder.addAll("{\"messages\":["_kj);
476+
for (size_t i = 0; i < messageCount; ++i) {
477+
bodyBuilder.addAll("{\"body\":\""_kj);
478+
bodyBuilder.addAll(kj::encodeBase64(serializedBodies[i].body.data));
479+
bodyBuilder.add('"');
480+
481+
KJ_IF_SOME(contentType, serializedBodies[i].contentType) {
482+
bodyBuilder.addAll(",\"contentType\":\""_kj);
483+
bodyBuilder.addAll(contentType);
484+
bodyBuilder.add('"');
485+
}
486+
487+
KJ_IF_SOME(delaySecs, serializedBodies[i].delaySeconds) {
488+
bodyBuilder.addAll(",\"delaySecs\": "_kj);
489+
bodyBuilder.addAll(kj::str(delaySecs));
490+
}
491+
492+
bodyBuilder.addAll("}"_kj);
493+
if (i < messageCount - 1) {
494+
bodyBuilder.add(',');
495+
}
496+
}
497+
bodyBuilder.addAll("]}"_kj);
498+
bodyBuilder.add('\0');
499+
KJ_DASSERT(bodyBuilder.size() <= estimatedSize);
500+
kj::String body(bodyBuilder.releaseAsArray());
501+
KJ_DASSERT(jsg::JsValue::fromJson(js, body).isObject());
502+
503+
auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_send"_kjc);
504+
505+
auto headers = kj::HttpHeaders(context.getHeaderTable());
506+
headers.addPtr("CF-Queue-Batch-Count"_kj, kj::str(messageCount));
507+
headers.addPtr("CF-Queue-Batch-Bytes"_kj, kj::str(totalSize));
508+
headers.addPtr("CF-Queue-Largest-Msg"_kj, kj::str(largestMessage));
509+
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::JSON.toString());
510+
511+
KJ_IF_SOME(opts, options) {
512+
KJ_IF_SOME(secs, opts.delaySeconds) {
513+
headers.addPtr(HDR_MSG_DELAY, kj::str(secs));
514+
}
515+
}
516+
517+
auto req =
518+
client->request(kj::HttpMethod::POST, "https://fake-host/batch"_kjc, headers, body.size());
519+
520+
const auto& headerIds = context.getHeaderIds();
521+
const auto exposeErrorCodes = workerd::FeatureFlags::get(js).getQueueExposeErrorCodes();
522+
static constexpr auto handleWrite = [](auto req, auto body, auto client, auto& headerIds,
523+
bool exposeErrorCodes) -> kj::Promise<kj::String> {
524+
co_await req.body->write(body.asBytes());
525+
auto response = co_await req.response;
526+
527+
if (exposeErrorCodes) {
528+
JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds));
529+
} else {
530+
JSG_REQUIRE(response.statusCode == 200, Error,
531+
kj::str("Queue sendBatch failed: ", response.statusText));
532+
}
533+
534+
auto responseBody = co_await response.body->readAllBytes();
535+
co_return kj::str(responseBody.asChars());
536+
};
537+
538+
auto promise =
539+
handleWrite(kj::mv(req), kj::mv(body), kj::mv(client), headerIds, exposeErrorCodes);
540+
541+
return context.awaitIo(
542+
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse {
543+
auto parsed = jsg::JsValue::fromJson(js, text);
544+
KJ_IF_SOME(result, responseHandler.tryUnwrap(js, parsed)) {
545+
return kj::mv(result);
546+
}
547+
_JSG_INTERNAL_FAIL_REQUIRE(
548+
JSG_EXCEPTION(Error), "Failed to parse queue sendBatch response", text);
549+
});
550+
}
551+
365552
QueueMessage::QueueMessage(
366553
jsg::Lock& js, rpc::QueueMessage::Reader message, IoPtr<QueueEventResult> result)
367554
: id(kj::str(message.getId())),

src/workerd/api/queue.h

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,46 @@ class WorkerQueue: public jsg::Object {
2727
// representing this queue.
2828
WorkerQueue(uint subrequestChannel): subrequestChannel(subrequestChannel) {}
2929

30+
struct SendMetrics {
31+
double backlogCount;
32+
double backlogBytes;
33+
double oldestMessageTimestamp;
34+
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
35+
JSG_STRUCT_TS_OVERRIDE(QueueSendMetrics);
36+
};
37+
38+
struct SendMetadata {
39+
SendMetrics metrics;
40+
JSG_STRUCT(metrics);
41+
JSG_STRUCT_TS_OVERRIDE(QueueSendMetadata);
42+
};
43+
44+
struct SendResponse {
45+
SendMetadata metadata;
46+
JSG_STRUCT(metadata);
47+
JSG_STRUCT_TS_OVERRIDE(QueueSendResponse);
48+
};
49+
50+
struct SendBatchMetrics {
51+
double backlogCount;
52+
double backlogBytes;
53+
double oldestMessageTimestamp;
54+
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
55+
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetrics);
56+
};
57+
58+
struct SendBatchMetadata {
59+
SendBatchMetrics metrics;
60+
JSG_STRUCT(metrics);
61+
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetadata);
62+
};
63+
64+
struct SendBatchResponse {
65+
SendBatchMetadata metadata;
66+
JSG_STRUCT(metadata);
67+
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchResponse);
68+
};
69+
3070
struct SendOptions {
3171
// TODO(soon): Support metadata.
3272

@@ -69,21 +109,45 @@ class WorkerQueue: public jsg::Object {
69109

70110
kj::Promise<void> send(jsg::Lock& js, jsg::JsValue body, jsg::Optional<SendOptions> options);
71111

112+
jsg::Promise<SendResponse> sendWithResponse(jsg::Lock& js,
113+
jsg::JsValue body,
114+
jsg::Optional<SendOptions> options,
115+
const jsg::TypeHandler<SendResponse>& responseHandler);
116+
72117
kj::Promise<void> sendBatch(jsg::Lock& js,
73118
jsg::Sequence<MessageSendRequest> batch,
74119
jsg::Optional<SendBatchOptions> options);
75120

76-
JSG_RESOURCE_TYPE(WorkerQueue) {
77-
JSG_METHOD(send);
78-
JSG_METHOD(sendBatch);
121+
jsg::Promise<SendBatchResponse> sendBatchWithResponse(jsg::Lock& js,
122+
jsg::Sequence<MessageSendRequest> batch,
123+
jsg::Optional<SendBatchOptions> options,
124+
const jsg::TypeHandler<SendBatchResponse>& responseHandler);
125+
126+
JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) {
127+
if (flags.getWorkerdExperimental()) {
128+
JSG_METHOD_NAMED(send, sendWithResponse);
129+
JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse);
130+
} else {
131+
JSG_METHOD(send);
132+
JSG_METHOD(sendBatch);
133+
}
79134

80135
JSG_TS_ROOT();
81-
JSG_TS_OVERRIDE(Queue<Body = unknown> {
82-
send(message: Body, options?: QueueSendOptions): Promise<void>;
83-
sendBatch(messages
84-
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
85-
: Promise<void>;
86-
});
136+
if (flags.getWorkerdExperimental()) {
137+
JSG_TS_OVERRIDE(Queue<Body = unknown> {
138+
send(message: Body, options?: QueueSendOptions): Promise<QueueSendResponse>;
139+
sendBatch(messages
140+
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
141+
: Promise<QueueSendBatchResponse>;
142+
});
143+
} else {
144+
JSG_TS_OVERRIDE(Queue<Body = unknown> {
145+
send(message: Body, options?: QueueSendOptions): Promise<void>;
146+
sendBatch(messages
147+
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
148+
: Promise<void>;
149+
});
150+
}
87151
JSG_TS_DEFINE(type QueueContentType = "text" | "bytes" | "json" | "v8");
88152
}
89153

@@ -376,7 +440,10 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re
376440
};
377441

378442
#define EW_QUEUE_ISOLATE_TYPES \
379-
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
443+
api::WorkerQueue, api::WorkerQueue::SendMetrics, api::WorkerQueue::SendMetadata, \
444+
api::WorkerQueue::SendResponse, api::WorkerQueue::SendBatchMetrics, \
445+
api::WorkerQueue::SendBatchMetadata, api::WorkerQueue::SendBatchResponse, \
446+
api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
380447
api::WorkerQueue::MessageSendRequest, api::IncomingQueueMessage, api::QueueRetryBatch, \
381448
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
382449
api::QueueEvent, api::QueueController, api::QueueExportedHandler

src/workerd/api/tests/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ wd_test(
119119
],
120120
)
121121

122+
wd_test(
123+
src = "queue-producer-metadata-test.wd-test",
124+
args = ["--experimental"],
125+
data = [
126+
"queue-producer-metadata-test.js",
127+
],
128+
)
129+
122130
wd_test(
123131
src = "r2-test.wd-test",
124132
args = ["--experimental"],

0 commit comments

Comments
 (0)