Skip to content

Commit 1b54323

Browse files
committed
Change Queues oldestMessageTimestamp to use kj::Date
1 parent 0a4da7e commit 1b54323

10 files changed

Lines changed: 177 additions & 23 deletions

File tree

src/workerd/api/queue.c++

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ namespace {
2323
// Header for the message format.
2424
static constexpr kj::StringPtr HDR_MSG_FORMAT = "X-Msg-Fmt"_kj;
2525

26+
// The upstream service sends 0 when there is "no data" available on a timestamp field (e.g. no `oldestMessageTimestamp`).
27+
// This method converts it to kj::none so users see `undefined`.
28+
void clearEpochSentinel(jsg::Optional<kj::Date>& ts) {
29+
KJ_IF_SOME(date, ts) {
30+
if (date == kj::UNIX_EPOCH) {
31+
ts = kj::none;
32+
}
33+
}
34+
}
35+
2636
// Header for the message delivery delay.
2737
static constexpr kj::StringPtr HDR_MSG_DELAY = "X-Msg-Delay-Secs"_kj;
2838

@@ -306,8 +316,10 @@ jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock&
306316
return context.awaitIo(
307317
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
308318
auto parsed = jsg::JsValue::fromJson(js, text);
309-
return JSG_REQUIRE_NONNULL(
319+
auto result = JSG_REQUIRE_NONNULL(
310320
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
321+
clearEpochSentinel(result.metadata.metrics.oldestMessageTimestamp);
322+
return kj::mv(result);
311323
});
312324
}
313325

@@ -456,6 +468,7 @@ jsg::Promise<WorkerQueue::Metrics> WorkerQueue::metrics(
456468
auto parsed = jsg::JsValue::fromJson(js, text);
457469
auto result = JSG_REQUIRE_NONNULL(metricsHandler.tryUnwrap(js, parsed), Error,
458470
"Failed to parse queue metrics response", text);
471+
clearEpochSentinel(result.oldestMessageTimestamp);
459472
return kj::mv(result);
460473
});
461474
}
@@ -569,8 +582,10 @@ jsg::Promise<WorkerQueue::SendBatchResponse> WorkerQueue::sendBatchWithResponse(
569582
return context.awaitIo(
570583
js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendBatchResponse {
571584
auto parsed = jsg::JsValue::fromJson(js, text);
572-
return JSG_REQUIRE_NONNULL(
585+
auto result = JSG_REQUIRE_NONNULL(
573586
responseHandler.tryUnwrap(js, parsed), Error, "Failed to parse queue send response", text);
587+
clearEpochSentinel(result.metadata.metrics.oldestMessageTimestamp);
588+
return kj::mv(result);
574589
});
575590
}
576591

@@ -658,14 +673,19 @@ QueueEvent::QueueEvent(
658673
}
659674
messages = messagesBuilder.finish();
660675

661-
// Extract metadata. If the sender didn't set the field, capnp defaults all values to zero.
676+
// Extract metadata. If the sender didn't set the field, capnp defaults all to the zero values.
662677
auto m = params.getMetadata().getMetrics();
678+
jsg::Optional<kj::Date> oldestTimestamp;
679+
if (m.getOldestMessageTimestamp() != 0) {
680+
oldestTimestamp =
681+
kj::UNIX_EPOCH + static_cast<int64_t>(m.getOldestMessageTimestamp()) * kj::MILLISECONDS;
682+
}
663683
metadata = MessageBatchMetadata{
664684
.metrics =
665685
MessageBatchMetrics{
666686
.backlogCount = m.getBacklogCount(),
667687
.backlogBytes = m.getBacklogBytes(),
668-
.oldestMessageTimestamp = m.getOldestMessageTimestamp(),
688+
.oldestMessageTimestamp = kj::mv(oldestTimestamp),
669689
},
670690
};
671691
}
@@ -985,7 +1005,9 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::sendRpc(
9851005
auto metricsBuilder = metadataBuilder.initMetrics();
9861006
metricsBuilder.setBacklogCount(p.metadata.metrics.backlogCount);
9871007
metricsBuilder.setBacklogBytes(p.metadata.metrics.backlogBytes);
988-
metricsBuilder.setOldestMessageTimestamp(p.metadata.metrics.oldestMessageTimestamp);
1008+
KJ_IF_SOME(ts, p.metadata.metrics.oldestMessageTimestamp) {
1009+
metricsBuilder.setOldestMessageTimestamp((ts - kj::UNIX_EPOCH) / kj::MILLISECONDS);
1010+
}
9891011
}
9901012
}
9911013
}

src/workerd/api/queue.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,22 @@ class WorkerQueue: public jsg::Object {
2727
// representing this queue.
2828
WorkerQueue(uint subrequestChannel): subrequestChannel(subrequestChannel) {}
2929

30+
// The metrics structs below (Metrics, SendMetrics, SendBatchMetrics) are deserialized from
31+
// JSON responses where the upstream service uses 0 as a sentinel for "no data" on timestamp
32+
// fields. Callers MUST call clearEpochSentinel() on oldestMessageTimestamp after deserialization to convert the
33+
// sentinel to kj::none (JS undefined).
3034
struct Metrics {
3135
double backlogCount;
3236
double backlogBytes;
33-
double oldestMessageTimestamp;
37+
jsg::Optional<kj::Date> oldestMessageTimestamp;
3438
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
3539
JSG_STRUCT_TS_OVERRIDE(QueueMetrics);
3640
};
3741

3842
struct SendMetrics {
3943
double backlogCount;
4044
double backlogBytes;
41-
double oldestMessageTimestamp;
45+
jsg::Optional<kj::Date> oldestMessageTimestamp;
4246
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
4347
JSG_STRUCT_TS_OVERRIDE(QueueSendMetrics);
4448
};
@@ -58,7 +62,7 @@ class WorkerQueue: public jsg::Object {
5862
struct SendBatchMetrics {
5963
double backlogCount;
6064
double backlogBytes;
61-
double oldestMessageTimestamp;
65+
jsg::Optional<kj::Date> oldestMessageTimestamp;
6266
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
6367
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchMetrics);
6468
};
@@ -171,10 +175,12 @@ class WorkerQueue: public jsg::Object {
171175

172176
// Metadata delivered with a message batch in the queue() handler
173177

178+
// Same sentinel caveat as WorkerQueue::Metrics above: the capnp path uses 0 to mean "no data"
179+
// for oldestMessageTimestamp. As such, we must explicitly set it to kj::none (JS undefined).
174180
struct MessageBatchMetrics {
175181
double backlogCount;
176182
double backlogBytes;
177-
double oldestMessageTimestamp;
183+
jsg::Optional<kj::Date> oldestMessageTimestamp;
178184
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
179185
JSG_STRUCT_TS_OVERRIDE(MessageBatchMetrics);
180186
};

src/workerd/api/tests/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@ wd_test(
164164
data = ["queue-metadata-test.js"],
165165
)
166166

167+
wd_test(
168+
src = "queue-metrics-sentinel-test.wd-test",
169+
args = ["--experimental"],
170+
data = ["queue-metrics-sentinel-test.js"],
171+
)
172+
167173
wd_test(
168174
src = "queue-producer-metadata-test.wd-test",
169175
args = ["--experimental"],

src/workerd/api/tests/queue-metadata-test.js

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,24 @@ export default {
2525
if (
2626
batch.metadata.metrics.backlogCount === 0 &&
2727
batch.metadata.metrics.backlogBytes === 0 &&
28-
batch.metadata.metrics.oldestMessageTimestamp === 0
28+
batch.metadata.metrics.oldestMessageTimestamp === undefined
2929
) {
30-
// If metadata is omitted → all values default to zero
30+
// If metadata is omitted → counts default to zero, timestamp is undefined
3131
batch.ackAll();
3232
return;
3333
}
3434

3535
// Explicit metadata path
3636
assert.strictEqual(batch.metadata.metrics.backlogCount, 100);
3737
assert.strictEqual(batch.metadata.metrics.backlogBytes, 2048);
38-
assert.strictEqual(batch.metadata.metrics.oldestMessageTimestamp, 1000000);
38+
assert.ok(
39+
batch.metadata.metrics.oldestMessageTimestamp instanceof Date,
40+
'Expected oldestMessageTimestamp to be a Date'
41+
);
42+
assert.strictEqual(
43+
batch.metadata.metrics.oldestMessageTimestamp.getTime(),
44+
1000000
45+
);
3946
batch.ackAll();
4047
},
4148

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright (c) 2026 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
// Tests that the upstream sentinel value of 0 for oldestMessageTimestamp is
6+
// correctly converted to undefined (kj::none) by clearEpochSentinel().
7+
8+
import assert from 'node:assert';
9+
10+
export default {
11+
async fetch(request) {
12+
const { pathname } = new URL(request.url);
13+
14+
if (pathname === '/metrics') {
15+
return Response.json({
16+
backlogCount: 5,
17+
backlogBytes: 100,
18+
oldestMessageTimestamp: 0,
19+
});
20+
}
21+
22+
if (pathname === '/message') {
23+
await request.arrayBuffer();
24+
return Response.json({
25+
metadata: {
26+
metrics: {
27+
backlogCount: 5,
28+
backlogBytes: 100,
29+
oldestMessageTimestamp: 0,
30+
},
31+
},
32+
});
33+
}
34+
35+
if (pathname === '/batch') {
36+
await request.arrayBuffer();
37+
return Response.json({
38+
metadata: {
39+
metrics: {
40+
backlogCount: 10,
41+
backlogBytes: 200,
42+
oldestMessageTimestamp: 0,
43+
},
44+
},
45+
});
46+
}
47+
48+
return new Response('Not Found', { status: 404 });
49+
},
50+
51+
async test(ctrl, env) {
52+
// Test metrics() zero-sentinel → undefined
53+
const metrics = await env.QUEUE.metrics();
54+
assert.strictEqual(metrics.backlogCount, 5);
55+
assert.strictEqual(metrics.backlogBytes, 100);
56+
assert.strictEqual(
57+
metrics.oldestMessageTimestamp,
58+
undefined,
59+
'Expected oldestMessageTimestamp to be undefined when upstream sends 0'
60+
);
61+
62+
// Test send() zero-sentinel → undefined
63+
const sendResult = await env.QUEUE.send('abc', { contentType: 'text' });
64+
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 5);
65+
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 100);
66+
assert.strictEqual(
67+
sendResult.metadata.metrics.oldestMessageTimestamp,
68+
undefined,
69+
'Expected send oldestMessageTimestamp to be undefined when upstream sends 0'
70+
);
71+
72+
// Test sendBatch() zero-sentinel → undefined
73+
const sendBatchResult = await env.QUEUE.sendBatch([
74+
{ body: 'def', contentType: 'text' },
75+
]);
76+
assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 10);
77+
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 200);
78+
assert.strictEqual(
79+
sendBatchResult.metadata.metrics.oldestMessageTimestamp,
80+
undefined,
81+
'Expected sendBatch oldestMessageTimestamp to be undefined when upstream sends 0'
82+
);
83+
},
84+
};
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "queue-metrics-sentinel-test",
6+
worker = (
7+
modules = [
8+
( name = "worker", esModule = embed "queue-metrics-sentinel-test.js" )
9+
],
10+
bindings = [
11+
( name = "QUEUE", queue = "queue-metrics-sentinel-test" ),
12+
],
13+
compatibilityFlags = ["nodejs_compat", "queues_json_messages", "experimental", "capture_async_api_throws"],
14+
)
15+
),
16+
],
17+
);

src/workerd/api/tests/queue-metrics-test.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ export default {
2626
const metrics = await env.QUEUE.metrics();
2727
assert.strictEqual(metrics.backlogCount, 100);
2828
assert.strictEqual(metrics.backlogBytes, 2048);
29-
assert.strictEqual(metrics.oldestMessageTimestamp, 1000000);
29+
assert.ok(
30+
metrics.oldestMessageTimestamp instanceof Date,
31+
'Expected oldestMessageTimestamp to be a Date'
32+
);
33+
assert.strictEqual(metrics.oldestMessageTimestamp.getTime(), 1000000);
3034
} else {
3135
// Flag OFF → metrics() should not be exposed on the binding
3236
assert.strictEqual(typeof env.QUEUE.metrics, 'undefined');

src/workerd/api/tests/queue-producer-metadata-test.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,23 @@ export default {
5656
if (responseBodyEnabled) {
5757
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100);
5858
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048);
59+
assert.ok(
60+
sendResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
61+
'Expected oldestMessageTimestamp to be a Date'
62+
);
5963
assert.strictEqual(
60-
sendResult.metadata.metrics.oldestMessageTimestamp,
64+
sendResult.metadata.metrics.oldestMessageTimestamp.getTime(),
6165
1000000
6266
);
6367

6468
assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200);
6569
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096);
70+
assert.ok(
71+
sendBatchResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
72+
'Expected oldestMessageTimestamp to be a Date'
73+
);
6674
assert.strictEqual(
67-
sendBatchResult.metadata.metrics.oldestMessageTimestamp,
75+
sendBatchResult.metadata.metrics.oldestMessageTimestamp.getTime(),
6876
2000000
6977
);
7078
} else {

types/generated-snapshot/experimental/index.d.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2409,7 +2409,7 @@ interface Queue<Body = unknown> {
24092409
interface QueueSendMetrics {
24102410
backlogCount: number;
24112411
backlogBytes: number;
2412-
oldestMessageTimestamp: number;
2412+
oldestMessageTimestamp?: Date;
24132413
}
24142414
interface QueueSendMetadata {
24152415
metrics: QueueSendMetrics;
@@ -2420,7 +2420,7 @@ interface QueueSendResponse {
24202420
interface QueueSendBatchMetrics {
24212421
backlogCount: number;
24222422
backlogBytes: number;
2423-
oldestMessageTimestamp: number;
2423+
oldestMessageTimestamp?: Date;
24242424
}
24252425
interface QueueSendBatchMetadata {
24262426
metrics: QueueSendBatchMetrics;
@@ -2443,12 +2443,12 @@ interface MessageSendRequest<Body = unknown> {
24432443
interface QueueMetrics {
24442444
backlogCount: number;
24452445
backlogBytes: number;
2446-
oldestMessageTimestamp: number;
2446+
oldestMessageTimestamp?: Date;
24472447
}
24482448
interface MessageBatchMetrics {
24492449
backlogCount: number;
24502450
backlogBytes: number;
2451-
oldestMessageTimestamp: number;
2451+
oldestMessageTimestamp?: Date;
24522452
}
24532453
interface MessageBatchMetadata {
24542454
metrics: MessageBatchMetrics;

types/generated-snapshot/experimental/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2412,7 +2412,7 @@ export interface Queue<Body = unknown> {
24122412
export interface QueueSendMetrics {
24132413
backlogCount: number;
24142414
backlogBytes: number;
2415-
oldestMessageTimestamp: number;
2415+
oldestMessageTimestamp?: Date;
24162416
}
24172417
export interface QueueSendMetadata {
24182418
metrics: QueueSendMetrics;
@@ -2423,7 +2423,7 @@ export interface QueueSendResponse {
24232423
export interface QueueSendBatchMetrics {
24242424
backlogCount: number;
24252425
backlogBytes: number;
2426-
oldestMessageTimestamp: number;
2426+
oldestMessageTimestamp?: Date;
24272427
}
24282428
export interface QueueSendBatchMetadata {
24292429
metrics: QueueSendBatchMetrics;
@@ -2446,12 +2446,12 @@ export interface MessageSendRequest<Body = unknown> {
24462446
export interface QueueMetrics {
24472447
backlogCount: number;
24482448
backlogBytes: number;
2449-
oldestMessageTimestamp: number;
2449+
oldestMessageTimestamp?: Date;
24502450
}
24512451
export interface MessageBatchMetrics {
24522452
backlogCount: number;
24532453
backlogBytes: number;
2454-
oldestMessageTimestamp: number;
2454+
oldestMessageTimestamp?: Date;
24552455
}
24562456
export interface MessageBatchMetadata {
24572457
metrics: MessageBatchMetrics;

0 commit comments

Comments
 (0)