Skip to content

Commit c395270

Browse files
committed
feat(MQ-1202): include metrics metadata in queue() handler message batch
1 parent 8d1aa50 commit c395270

11 files changed

Lines changed: 243 additions & 19 deletions

File tree

src/workerd/api/http.c++

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2230,8 +2230,10 @@ jsg::Promise<void> Fetcher::delete_(jsg::Lock& js, kj::String url) {
22302230
return throwOnError(js, "DELETE", fetchImpl(js, JSG_THIS, kj::mv(url), kj::mv(subInit)));
22312231
}
22322232

2233-
jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
2234-
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages) {
2233+
jsg::Promise<Fetcher::QueueResult> Fetcher::queue(jsg::Lock& js,
2234+
kj::String queueName,
2235+
kj::Array<ServiceBindingQueueMessage> messages,
2236+
jsg::Optional<MessageBatchMetadata> metadata) {
22352237
auto& ioContext = IoContext::current();
22362238

22372239
auto encodedMessages = kj::heapArrayBuilder<IncomingQueueMessage>(messages.size());
@@ -2264,6 +2266,7 @@ jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
22642266
auto event = kj::refcounted<api::QueueCustomEvent>(QueueEvent::Params{
22652267
.queueName = kj::mv(queueName),
22662268
.messages = encodedMessages.finish(),
2269+
.metadata = kj::mv(metadata).orDefault({}),
22672270
});
22682271

22692272
auto eventRef =

src/workerd/api/http.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,10 @@ class Fetcher: public JsRpcClientProvider {
392392
JSG_STRUCT(outcome, ackAll, retryBatch, explicitAcks, retryMessages);
393393
};
394394

395-
jsg::Promise<QueueResult> queue(
396-
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages);
395+
jsg::Promise<QueueResult> queue(jsg::Lock& js,
396+
kj::String queueName,
397+
kj::Array<ServiceBindingQueueMessage> messages,
398+
jsg::Optional<MessageBatchMetadata> metadata);
397399

398400
struct ScheduledOptions {
399401
jsg::Optional<kj::Date> scheduledTime;
@@ -446,7 +448,7 @@ class Fetcher: public JsRpcClientProvider {
446448
) & {
447449
fetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response>;
448450
connect(address: SocketAddress | string, options?: SocketOptions): Socket;
449-
queue(queueName: string, messages: ServiceBindingQueueMessage[]): Promise<FetcherQueueResult>;
451+
queue(queueName: string, messages: ServiceBindingQueueMessage[], metadata?: MessageBatchMetadata): Promise<FetcherQueueResult>;
450452
scheduled(options?: FetcherScheduledOptions): Promise<FetcherScheduledResult>;
451453
});
452454
} else {

src/workerd/api/queue.c++

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,11 +445,23 @@ QueueEvent::QueueEvent(
445445
messagesBuilder.add(js.alloc<QueueMessage>(js, incoming[i], result));
446446
}
447447
messages = messagesBuilder.finish();
448+
449+
// Extract metadata. If the sender didn't set the field, capnp defaults all values to zero.
450+
auto m = params.getMetadata().getMetrics();
451+
metadata = MessageBatchMetadata{
452+
.metrics =
453+
MessageBatchMetrics{
454+
.backlogCount = m.getBacklogCount(),
455+
.backlogBytes = m.getBacklogBytes(),
456+
.oldestMessageTimestamp = m.getOldestMessageTimestamp(),
457+
},
458+
};
448459
}
449460

450461
QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr<QueueEventResult> result)
451462
: ExtendableEvent("queue"),
452463
queueName(kj::mv(params.queueName)),
464+
metadata(kj::mv(params.metadata)),
453465
result(result) {
454466
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size());
455467
for (auto i: kj::indices(params.messages)) {
@@ -742,6 +754,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
742754
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
743755
req.setQueueName(p.getQueueName());
744756
req.setMessages(p.getMessages());
757+
req.setMetadata(p.getMetadata());
745758
}
746759
KJ_CASE_ONEOF(p, QueueEvent::Params) {
747760
req.setQueueName(p.queueName);
@@ -755,6 +768,13 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
755768
}
756769
messages[i].setAttempts(p.messages[i].attempts);
757770
}
771+
{
772+
auto metadataBuilder = req.initMetadata();
773+
auto metricsBuilder = metadataBuilder.initMetrics();
774+
metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount);
775+
metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes);
776+
metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp);
777+
}
758778
}
759779
}
760780

src/workerd/api/queue.h

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,22 @@ class WorkerQueue: public jsg::Object {
9393

9494
// Event handler types
9595

96+
// Metadata delivered with a message batch in the queue() handler
97+
98+
struct MessageBatchMetrics {
99+
double backlogCount;
100+
double backlogBytes;
101+
double oldestMessageTimestamp;
102+
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
103+
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics);
104+
};
105+
106+
struct MessageBatchMetadata {
107+
MessageBatchMetrics metrics;
108+
JSG_STRUCT(metrics);
109+
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetadata);
110+
};
111+
96112
// Types for other workers passing messages into and responses out of a queue handler.
97113

98114
struct IncomingQueueMessage {
@@ -212,6 +228,7 @@ class QueueEvent final: public ExtendableEvent {
212228
struct Params {
213229
kj::String queueName;
214230
kj::Array<IncomingQueueMessage> messages;
231+
MessageBatchMetadata metadata;
215232
};
216233

217234
explicit QueueEvent(jsg::Lock& js,
@@ -227,30 +244,45 @@ class QueueEvent final: public ExtendableEvent {
227244
kj::StringPtr getQueueName() {
228245
return queueName;
229246
}
247+
MessageBatchMetadata getMetadata() {
248+
return metadata;
249+
}
230250

231251
void retryAll(jsg::Optional<QueueRetryOptions> options);
232252
void ackAll();
233253

234-
JSG_RESOURCE_TYPE(QueueEvent) {
254+
JSG_RESOURCE_TYPE(QueueEvent, CompatibilityFlags::Reader flags) {
235255
JSG_INHERIT(ExtendableEvent);
236256

237257
JSG_LAZY_READONLY_INSTANCE_PROPERTY(messages, getMessages);
238258
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);
239259

260+
if (flags.getWorkerdExperimental()) {
261+
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
262+
}
263+
240264
JSG_METHOD(retryAll);
241265
JSG_METHOD(ackAll);
242266

243267
JSG_TS_ROOT();
244-
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
245-
readonly messages: readonly Message<Body>[];
246-
});
268+
if (flags.getWorkerdExperimental()) {
269+
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
270+
readonly messages: readonly Message<Body>[];
271+
readonly metadata: MessageBatchMetadata;
272+
});
273+
} else {
274+
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
275+
readonly messages: readonly Message<Body>[];
276+
});
277+
}
247278
}
248279

249280
void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
250281
for (auto& message: messages) {
251282
tracker.trackField("message", message);
252283
}
253284
tracker.trackField("queueName", queueName);
285+
tracker.trackFieldWithSize("metadata", sizeof(MessageBatchMetadata));
254286
tracker.trackFieldWithSize("IoPtr<QueueEventResult>", sizeof(IoPtr<QueueEventResult>));
255287
}
256288

@@ -274,6 +306,7 @@ class QueueEvent final: public ExtendableEvent {
274306
// array to avoid one intermediate copy?
275307
kj::Array<jsg::Ref<QueueMessage>> messages;
276308
kj::String queueName;
309+
MessageBatchMetadata metadata;
277310
IoPtr<QueueEventResult> result;
278311
CompletionStatus completionStatus = Incomplete{};
279312

@@ -293,24 +326,38 @@ class QueueController final: public jsg::Object {
293326
kj::StringPtr getQueueName() {
294327
return event->getQueueName();
295328
}
329+
MessageBatchMetadata getMetadata() {
330+
return event->getMetadata();
331+
}
296332
void retryAll(jsg::Optional<QueueRetryOptions> options) {
297333
event->retryAll(options);
298334
}
299335
void ackAll() {
300336
event->ackAll();
301337
}
302338

303-
JSG_RESOURCE_TYPE(QueueController) {
339+
JSG_RESOURCE_TYPE(QueueController, CompatibilityFlags::Reader flags) {
304340
JSG_READONLY_INSTANCE_PROPERTY(messages, getMessages);
305341
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);
306342

343+
if (flags.getWorkerdExperimental()) {
344+
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
345+
}
346+
307347
JSG_METHOD(retryAll);
308348
JSG_METHOD(ackAll);
309349

310350
JSG_TS_ROOT();
311-
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
312-
readonly messages: readonly Message<Body>[];
313-
});
351+
if (flags.getWorkerdExperimental()) {
352+
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
353+
readonly messages: readonly Message<Body>[];
354+
readonly metadata: MessageBatchMetadata;
355+
});
356+
} else {
357+
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
358+
readonly messages: readonly Message<Body>[];
359+
});
360+
}
314361
}
315362

316363
void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
@@ -377,8 +424,9 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re
377424

378425
#define EW_QUEUE_ISOLATE_TYPES \
379426
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
380-
api::WorkerQueue::MessageSendRequest, api::IncomingQueueMessage, api::QueueRetryBatch, \
381-
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
382-
api::QueueEvent, api::QueueController, api::QueueExportedHandler
427+
api::WorkerQueue::MessageSendRequest, api::MessageBatchMetrics, api::MessageBatchMetadata, \
428+
api::IncomingQueueMessage, api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, \
429+
api::QueueRetryOptions, api::QueueMessage, api::QueueEvent, api::QueueController, \
430+
api::QueueExportedHandler
383431

384432
} // namespace workerd::api

src/workerd/api/tests/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ wd_test(
119119
],
120120
)
121121

122+
wd_test(
123+
src = "queue-metadata-test.wd-test",
124+
args = ["--experimental"],
125+
data = ["queue-metadata-test.js"],
126+
)
127+
122128
wd_test(
123129
src = "r2-test.wd-test",
124130
args = ["--experimental"],
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) 2026 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
import assert from 'node:assert';
6+
7+
export default {
8+
async queue(batch, env, ctx) {
9+
const flagEnabled = env.METADATA_FLAG;
10+
11+
if (!flagEnabled) {
12+
// Flag disabled → metadata property should not exist
13+
assert.strictEqual(batch.metadata, undefined);
14+
batch.ackAll();
15+
return;
16+
}
17+
18+
// Flag enabled → metadata should always be present
19+
assert.ok(batch.metadata, 'Expected batch.metadata to be defined');
20+
assert.ok(
21+
batch.metadata.metrics,
22+
'Expected batch.metadata.metrics to be defined'
23+
);
24+
25+
if (
26+
batch.metadata.metrics.backlogCount === 0 &&
27+
batch.metadata.metrics.backlogBytes === 0 &&
28+
batch.metadata.metrics.oldestMessageTimestamp === 0
29+
) {
30+
// If metadata is omitted → all values default to zero
31+
batch.ackAll();
32+
return;
33+
}
34+
35+
// Explicit metadata path
36+
assert.strictEqual(batch.metadata.metrics.backlogCount, 100);
37+
assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048);
38+
assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000);
39+
batch.ackAll();
40+
},
41+
42+
async test(ctrl, env, ctx) {
43+
const flagEnabled = env.METADATA_FLAG;
44+
const timestamp = new Date();
45+
46+
if (flagEnabled) {
47+
const response1 = await env.SERVICE.queue(
48+
'test-queue',
49+
[{ id: '0', timestamp, body: 'hello', attempts: 1 }],
50+
{
51+
metrics: {
52+
backlogCount: 100,
53+
backlogBytes: 2048,
54+
oldestMessageTimestamp: 1000000,
55+
},
56+
}
57+
);
58+
assert.strictEqual(response1.outcome, 'ok');
59+
assert(response1.ackAll);
60+
61+
// Test with omitted metadata
62+
const response2 = await env.SERVICE.queue('test-queue', [
63+
{ id: '1', timestamp, body: 'world', attempts: 1 },
64+
]);
65+
assert.strictEqual(response2.outcome, 'ok');
66+
assert(response2.ackAll);
67+
} else {
68+
// Flag disabled → handler still works, metadata not visible
69+
const response = await env.SERVICE.queue('test-queue', [
70+
{ id: '0', timestamp, body: 'foobar', attempts: 1 },
71+
]);
72+
assert.strictEqual(response.outcome, 'ok');
73+
assert(response.ackAll);
74+
}
75+
},
76+
};
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "queue-metadata-test",
6+
worker = (
7+
modules = [
8+
( name = "worker", esModule = embed "queue-metadata-test.js" )
9+
],
10+
bindings = [
11+
( name = "SERVICE", service = "queue-metadata-test" ),
12+
( name = "METADATA_FLAG", json = "true" ),
13+
],
14+
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "experimental"],
15+
)
16+
),
17+
( name = "queue-metadata-disabled-test",
18+
worker = (
19+
modules = [
20+
( name = "worker-disabled", esModule = embed "queue-metadata-test.js" )
21+
],
22+
bindings = [
23+
( name = "SERVICE", service = "queue-metadata-disabled-test" ),
24+
( name = "METADATA_FLAG", json = "false" ),
25+
],
26+
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
27+
)
28+
),
29+
],
30+
);

src/workerd/io/worker-interface.capnp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,20 @@ struct QueueResponse @0x90e98932c0bfc0de {
367367
# List of retry options for messages that were explicitly marked for retry.
368368
}
369369

370+
struct MessageBatchMetrics {
371+
backlogCount @0 :Float64;
372+
# Number of messages remaining in the queue backlog.
373+
backlogBytes @1 :Float64;
374+
# Total bytes of messages remaining in the queue backlog.
375+
oldestMessageTimestamp @2 :Float64;
376+
# Timestamp (ms since epoch) of the oldest message in the queue.
377+
}
378+
379+
struct MessageBatchMetadata {
380+
metrics @0 :MessageBatchMetrics;
381+
# Best effort queue metrics at the time the batch was dispatched.
382+
}
383+
370384
struct HibernatableWebSocketEventMessage {
371385
payload :union {
372386
text @0 :Text;
@@ -756,11 +770,13 @@ interface EventDispatcher @0xf20697475ec1752d {
756770
# It would be cleaner to handle that inside the implementation so we could mark the entire
757771
# interface (and file) with allowCancellation.
758772

759-
queue @8 (messages :List(QueueMessage), queueName :Text) -> (result :QueueResponse)
773+
queue @8 (messages :List(QueueMessage), queueName :Text, metadata :MessageBatchMetadata)
774+
-> (result :QueueResponse)
760775
$Cxx.allowCancellation;
761776
# Delivers a batch of queue messages to a worker's queue event handler. Returns information about
762777
# the success of the batch, including which messages should be considered acknowledged and which
763-
# should be retried.
778+
# should be retried. The optional metadata field carries queue metrics at the time the batch was
779+
# dispatched; it is safe for the sender to omit this field (the consumer sees it as absent).
764780

765781
jsRpcSession @9 () -> (topLevel :JsRpcTarget) $Cxx.allowCancellation;
766782
# Opens a JS rpc "session". The call does not return until the session is complete.

0 commit comments

Comments
 (0)