Skip to content

Commit a9fc5bb

Browse files
authored
Merge pull request #6339 from KennethRuan/kruan/MQ-1202-add-metrics-metadata-to-queue-handler
2 parents ac4bdf9 + 8676265 commit a9fc5bb

11 files changed

Lines changed: 243 additions & 19 deletions

File tree

src/workerd/api/http.c++

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2323,8 +2323,10 @@ jsg::Promise<void> Fetcher::delete_(jsg::Lock& js, kj::String url) {
23232323
return throwOnError(js, "DELETE", fetchImpl(js, JSG_THIS, kj::mv(url), kj::mv(subInit)));
23242324
}
23252325

2326-
jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
2327-
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages) {
2326+
jsg::Promise<Fetcher::QueueResult> Fetcher::queue(jsg::Lock& js,
2327+
kj::String queueName,
2328+
kj::Array<ServiceBindingQueueMessage> messages,
2329+
jsg::Optional<MessageBatchMetadata> metadata) {
23282330
auto& ioContext = IoContext::current();
23292331

23302332
auto encodedMessages = kj::heapArrayBuilder<IncomingQueueMessage>(messages.size());
@@ -2357,6 +2359,7 @@ jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
23572359
auto event = kj::refcounted<api::QueueCustomEvent>(QueueEvent::Params{
23582360
.queueName = kj::mv(queueName),
23592361
.messages = encodedMessages.finish(),
2362+
.metadata = kj::mv(metadata).orDefault({}),
23602363
});
23612364

23622365
auto eventRef =

src/workerd/api/http.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,10 @@ class Fetcher: public JsRpcClientProvider {
392392
JSG_STRUCT(outcome, ackAll, retryBatch, explicitAcks, retryMessages);
393393
};
394394

395-
jsg::Promise<QueueResult> queue(
396-
jsg::Lock& js, kj::String queueName, kj::Array<ServiceBindingQueueMessage> messages);
395+
jsg::Promise<QueueResult> queue(jsg::Lock& js,
396+
kj::String queueName,
397+
kj::Array<ServiceBindingQueueMessage> messages,
398+
jsg::Optional<MessageBatchMetadata> metadata);
397399

398400
struct ScheduledOptions {
399401
jsg::Optional<kj::Date> scheduledTime;
@@ -446,7 +448,7 @@ class Fetcher: public JsRpcClientProvider {
446448
) & {
447449
fetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response>;
448450
connect(address: SocketAddress | string, options?: SocketOptions): Socket;
449-
queue(queueName: string, messages: ServiceBindingQueueMessage[]): Promise<FetcherQueueResult>;
451+
queue(queueName: string, messages: ServiceBindingQueueMessage[], metadata?: MessageBatchMetadata): Promise<FetcherQueueResult>;
450452
scheduled(options?: FetcherScheduledOptions): Promise<FetcherScheduledResult>;
451453
});
452454
} else {

src/workerd/api/queue.c++

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,11 +477,23 @@ QueueEvent::QueueEvent(
477477
messagesBuilder.add(js.alloc<QueueMessage>(js, incoming[i], result));
478478
}
479479
messages = messagesBuilder.finish();
480+
481+
// Extract metadata. If the sender didn't set the field, capnp defaults all values to zero.
482+
auto m = params.getMetadata().getMetrics();
483+
metadata = MessageBatchMetadata{
484+
.metrics =
485+
MessageBatchMetrics{
486+
.backlogCount = m.getBacklogCount(),
487+
.backlogBytes = m.getBacklogBytes(),
488+
.oldestMessageTimestamp = m.getOldestMessageTimestamp(),
489+
},
490+
};
480491
}
481492

482493
QueueEvent::QueueEvent(jsg::Lock& js, Params params, IoPtr<QueueEventResult> result)
483494
: ExtendableEvent("queue"),
484495
queueName(kj::mv(params.queueName)),
496+
metadata(kj::mv(params.metadata)),
485497
result(result) {
486498
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size());
487499
for (auto i: kj::indices(params.messages)) {
@@ -774,6 +786,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
774786
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
775787
req.setQueueName(p.getQueueName());
776788
req.setMessages(p.getMessages());
789+
req.setMetadata(p.getMetadata());
777790
}
778791
KJ_CASE_ONEOF(p, QueueEvent::Params) {
779792
req.setQueueName(p.queueName);
@@ -787,6 +800,13 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
787800
}
788801
messages[i].setAttempts(p.messages[i].attempts);
789802
}
803+
{
804+
auto metadataBuilder = req.initMetadata();
805+
auto metricsBuilder = metadataBuilder.initMetrics();
806+
metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount);
807+
metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes);
808+
metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp);
809+
}
790810
}
791811
}
792812

src/workerd/api/queue.h

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,22 @@ class WorkerQueue: public jsg::Object {
116116

117117
// Event handler types
118118

119+
// Metadata delivered with a message batch in the queue() handler
120+
121+
struct MessageBatchMetrics {
122+
double backlogCount;
123+
double backlogBytes;
124+
double oldestMessageTimestamp;
125+
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
126+
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics);
127+
};
128+
129+
struct MessageBatchMetadata {
130+
MessageBatchMetrics metrics;
131+
JSG_STRUCT(metrics);
132+
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetadata);
133+
};
134+
119135
// Types for other workers passing messages into and responses out of a queue handler.
120136

121137
struct IncomingQueueMessage {
@@ -235,6 +251,7 @@ class QueueEvent final: public ExtendableEvent {
235251
struct Params {
236252
kj::String queueName;
237253
kj::Array<IncomingQueueMessage> messages;
254+
MessageBatchMetadata metadata;
238255
};
239256

240257
explicit QueueEvent(jsg::Lock& js,
@@ -250,30 +267,45 @@ class QueueEvent final: public ExtendableEvent {
250267
kj::StringPtr getQueueName() {
251268
return queueName;
252269
}
270+
MessageBatchMetadata getMetadata() {
271+
return metadata;
272+
}
253273

254274
void retryAll(jsg::Optional<QueueRetryOptions> options);
255275
void ackAll();
256276

257-
JSG_RESOURCE_TYPE(QueueEvent) {
277+
JSG_RESOURCE_TYPE(QueueEvent, CompatibilityFlags::Reader flags) {
258278
JSG_INHERIT(ExtendableEvent);
259279

260280
JSG_LAZY_READONLY_INSTANCE_PROPERTY(messages, getMessages);
261281
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);
262282

283+
if (flags.getWorkerdExperimental()) {
284+
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
285+
}
286+
263287
JSG_METHOD(retryAll);
264288
JSG_METHOD(ackAll);
265289

266290
JSG_TS_ROOT();
267-
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
268-
readonly messages: readonly Message<Body>[];
269-
});
291+
if (flags.getWorkerdExperimental()) {
292+
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
293+
readonly messages: readonly Message<Body>[];
294+
readonly metadata: MessageBatchMetadata;
295+
});
296+
} else {
297+
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
298+
readonly messages: readonly Message<Body>[];
299+
});
300+
}
270301
}
271302

272303
void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
273304
for (auto& message: messages) {
274305
tracker.trackField("message", message);
275306
}
276307
tracker.trackField("queueName", queueName);
308+
tracker.trackFieldWithSize("metadata", sizeof(MessageBatchMetadata));
277309
tracker.trackFieldWithSize("IoPtr<QueueEventResult>", sizeof(IoPtr<QueueEventResult>));
278310
}
279311

@@ -297,6 +329,7 @@ class QueueEvent final: public ExtendableEvent {
297329
// array to avoid one intermediate copy?
298330
kj::Array<jsg::Ref<QueueMessage>> messages;
299331
kj::String queueName;
332+
MessageBatchMetadata metadata;
300333
IoPtr<QueueEventResult> result;
301334
CompletionStatus completionStatus = Incomplete{};
302335

@@ -316,24 +349,38 @@ class QueueController final: public jsg::Object {
316349
kj::StringPtr getQueueName() {
317350
return event->getQueueName();
318351
}
352+
MessageBatchMetadata getMetadata() {
353+
return event->getMetadata();
354+
}
319355
void retryAll(jsg::Optional<QueueRetryOptions> options) {
320356
event->retryAll(options);
321357
}
322358
void ackAll() {
323359
event->ackAll();
324360
}
325361

326-
JSG_RESOURCE_TYPE(QueueController) {
362+
JSG_RESOURCE_TYPE(QueueController, CompatibilityFlags::Reader flags) {
327363
JSG_READONLY_INSTANCE_PROPERTY(messages, getMessages);
328364
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);
329365

366+
if (flags.getWorkerdExperimental()) {
367+
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
368+
}
369+
330370
JSG_METHOD(retryAll);
331371
JSG_METHOD(ackAll);
332372

333373
JSG_TS_ROOT();
334-
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
335-
readonly messages: readonly Message<Body>[];
336-
});
374+
if (flags.getWorkerdExperimental()) {
375+
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
376+
readonly messages: readonly Message<Body>[];
377+
readonly metadata: MessageBatchMetadata;
378+
});
379+
} else {
380+
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
381+
readonly messages: readonly Message<Body>[];
382+
});
383+
}
337384
}
338385

339386
void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
@@ -400,8 +447,9 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re
400447

401448
#define EW_QUEUE_ISOLATE_TYPES \
402449
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
403-
api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::IncomingQueueMessage, \
404-
api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, \
405-
api::QueueMessage, api::QueueEvent, api::QueueController, api::QueueExportedHandler
450+
api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::MessageBatchMetrics, \
451+
api::MessageBatchMetadata, api::IncomingQueueMessage, api::QueueRetryBatch, \
452+
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
453+
api::QueueEvent, api::QueueController, api::QueueExportedHandler
406454

407455
} // namespace workerd::api

src/workerd/api/tests/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ wd_test(
158158
data = ["queue-metrics-test.js"],
159159
)
160160

161+
wd_test(
162+
src = "queue-metadata-test.wd-test",
163+
args = ["--experimental"],
164+
data = ["queue-metadata-test.js"],
165+
)
166+
161167
wd_test(
162168
src = "r2-test.wd-test",
163169
args = ["--experimental"],
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) 2026 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
import assert from 'node:assert';
6+
7+
export default {
8+
async queue(batch, env, ctx) {
9+
const flagEnabled = env.METADATA_FLAG;
10+
11+
if (!flagEnabled) {
12+
// Flag disabled → metadata property should not exist
13+
assert.strictEqual(batch.metadata, undefined);
14+
batch.ackAll();
15+
return;
16+
}
17+
18+
// Flag enabled → metadata should always be present
19+
assert.ok(batch.metadata, 'Expected batch.metadata to be defined');
20+
assert.ok(
21+
batch.metadata.metrics,
22+
'Expected batch.metadata.metrics to be defined'
23+
);
24+
25+
if (
26+
batch.metadata.metrics.backlogCount === 0 &&
27+
batch.metadata.metrics.backlogBytes === 0 &&
28+
batch.metadata.metrics.oldestMessageTimestamp === 0
29+
) {
30+
// If metadata is omitted → all values default to zero
31+
batch.ackAll();
32+
return;
33+
}
34+
35+
// Explicit metadata path
36+
assert.strictEqual(batch.metadata.metrics.backlogCount, 100);
37+
assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048);
38+
assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000);
39+
batch.ackAll();
40+
},
41+
42+
async test(ctrl, env, ctx) {
43+
const flagEnabled = env.METADATA_FLAG;
44+
const timestamp = new Date();
45+
46+
if (flagEnabled) {
47+
const response1 = await env.SERVICE.queue(
48+
'test-queue',
49+
[{ id: '0', timestamp, body: 'hello', attempts: 1 }],
50+
{
51+
metrics: {
52+
backlogCount: 100,
53+
backlogBytes: 2048,
54+
oldestMessageTimestamp: 1000000,
55+
},
56+
}
57+
);
58+
assert.strictEqual(response1.outcome, 'ok');
59+
assert(response1.ackAll);
60+
61+
// Test with omitted metadata
62+
const response2 = await env.SERVICE.queue('test-queue', [
63+
{ id: '1', timestamp, body: 'world', attempts: 1 },
64+
]);
65+
assert.strictEqual(response2.outcome, 'ok');
66+
assert(response2.ackAll);
67+
} else {
68+
// Flag disabled → handler still works, metadata not visible
69+
const response = await env.SERVICE.queue('test-queue', [
70+
{ id: '0', timestamp, body: 'foobar', attempts: 1 },
71+
]);
72+
assert.strictEqual(response.outcome, 'ok');
73+
assert(response.ackAll);
74+
}
75+
},
76+
};
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "queue-metadata-test",
6+
worker = (
7+
modules = [
8+
( name = "worker", esModule = embed "queue-metadata-test.js" )
9+
],
10+
bindings = [
11+
( name = "SERVICE", service = "queue-metadata-test" ),
12+
( name = "METADATA_FLAG", json = "true" ),
13+
],
14+
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "experimental"],
15+
)
16+
),
17+
( name = "queue-metadata-disabled-test",
18+
worker = (
19+
modules = [
20+
( name = "worker-disabled", esModule = embed "queue-metadata-test.js" )
21+
],
22+
bindings = [
23+
( name = "SERVICE", service = "queue-metadata-disabled-test" ),
24+
( name = "METADATA_FLAG", json = "false" ),
25+
],
26+
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
27+
)
28+
),
29+
],
30+
);

src/workerd/io/worker-interface.capnp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,20 @@ struct QueueResponse @0x90e98932c0bfc0de {
373373
# List of retry options for messages that were explicitly marked for retry.
374374
}
375375

376+
struct MessageBatchMetrics {
377+
backlogCount @0 :Float64;
378+
# Number of messages remaining in the queue backlog.
379+
backlogBytes @1 :Float64;
380+
# Total bytes of messages remaining in the queue backlog.
381+
oldestMessageTimestamp @2 :Float64;
382+
# Timestamp (ms since epoch) of the oldest message in the queue.
383+
}
384+
385+
struct MessageBatchMetadata {
386+
metrics @0 :MessageBatchMetrics;
387+
# Best effort queue metrics at the time the batch was dispatched.
388+
}
389+
376390
struct HibernatableWebSocketEventMessage {
377391
payload :union {
378392
text @0 :Text;
@@ -762,11 +776,13 @@ interface EventDispatcher @0xf20697475ec1752d {
762776
# It would be cleaner to handle that inside the implementation so we could mark the entire
763777
# interface (and file) with allowCancellation.
764778

765-
queue @8 (messages :List(QueueMessage), queueName :Text) -> (result :QueueResponse)
779+
queue @8 (messages :List(QueueMessage), queueName :Text, metadata :MessageBatchMetadata)
780+
-> (result :QueueResponse)
766781
$Cxx.allowCancellation;
767782
# Delivers a batch of queue messages to a worker's queue event handler. Returns information about
768783
# the success of the batch, including which messages should be considered acknowledged and which
769-
# should be retried.
784+
# should be retried. The optional metadata field carries queue metrics at the time the batch was
785+
# dispatched; it is safe for the sender to omit this field (the consumer sees it as absent).
770786

771787
jsRpcSession @9 () -> (topLevel :JsRpcTarget) $Cxx.allowCancellation;
772788
# Opens a JS rpc "session". The call does not return until the session is complete.

0 commit comments

Comments
 (0)