@@ -93,6 +93,22 @@ class WorkerQueue: public jsg::Object {
9393
9494// Event handler types
9595
96+ // Metadata delivered with a message batch in the queue() handler
97+
98+ struct MessageBatchMetrics {
99+ double backlogCount;
100+ double backlogBytes;
101+ double oldestMessageTimestamp;
102+ JSG_STRUCT (backlogCount, backlogBytes, oldestMessageTimestamp);
103+ JSG_STRUCT_TS_OVERRIDE (MessageBatchMetrics);
104+ };
105+
106+ struct MessageBatchMetadata {
107+ MessageBatchMetrics metrics;
108+ JSG_STRUCT (metrics);
109+ JSG_STRUCT_TS_OVERRIDE (MessageBatchMetadata);
110+ };
111+
96112// Types for other workers passing messages into and responses out of a queue handler.
97113
98114struct IncomingQueueMessage {
@@ -212,6 +228,7 @@ class QueueEvent final: public ExtendableEvent {
212228 struct Params {
213229 kj::String queueName;
214230 kj::Array<IncomingQueueMessage> messages;
231+ MessageBatchMetadata metadata;
215232 };
216233
217234 explicit QueueEvent (jsg::Lock& js,
@@ -227,23 +244,37 @@ class QueueEvent final: public ExtendableEvent {
227244 kj::StringPtr getQueueName () {
228245 return queueName;
229246 }
247+ MessageBatchMetadata getMetadata () {
248+ return metadata;
249+ }
230250
231251 void retryAll (jsg::Optional<QueueRetryOptions> options);
232252 void ackAll ();
233253
234- JSG_RESOURCE_TYPE (QueueEvent) {
254+ JSG_RESOURCE_TYPE (QueueEvent, CompatibilityFlags::Reader flags ) {
235255 JSG_INHERIT (ExtendableEvent);
236256
237257 JSG_LAZY_READONLY_INSTANCE_PROPERTY (messages, getMessages);
238258 JSG_READONLY_INSTANCE_PROPERTY (queue, getQueueName);
239259
260+ if (flags.getQueueBatchMetadata ()) {
261+ JSG_READONLY_INSTANCE_PROPERTY (metadata, getMetadata);
262+ }
263+
240264 JSG_METHOD (retryAll);
241265 JSG_METHOD (ackAll);
242266
243267 JSG_TS_ROOT ();
244- JSG_TS_OVERRIDE (QueueEvent<Body = unknown> {
245- readonly messages: readonly Message<Body>[];
246- });
268+ if (flags.getQueueBatchMetadata ()) {
269+ JSG_TS_OVERRIDE (QueueEvent<Body = unknown> {
270+ readonly messages: readonly Message<Body>[];
271+ readonly metadata: MessageBatchMetadata;
272+ });
273+ } else {
274+ JSG_TS_OVERRIDE (QueueEvent<Body = unknown> {
275+ readonly messages: readonly Message<Body>[];
276+ });
277+ }
247278 }
248279
249280 void visitForMemoryInfo (jsg::MemoryTracker& tracker) const {
@@ -274,6 +305,7 @@ class QueueEvent final: public ExtendableEvent {
274305 // array to avoid one intermediate copy?
275306 kj::Array<jsg::Ref<QueueMessage>> messages;
276307 kj::String queueName;
308+ MessageBatchMetadata metadata;
277309 IoPtr<QueueEventResult> result;
278310 CompletionStatus completionStatus = Incomplete{};
279311
@@ -293,24 +325,38 @@ class QueueController final: public jsg::Object {
293325 kj::StringPtr getQueueName () {
294326 return event->getQueueName ();
295327 }
328+ MessageBatchMetadata getMetadata () {
329+ return event->getMetadata ();
330+ }
296331 void retryAll (jsg::Optional<QueueRetryOptions> options) {
297332 event->retryAll (options);
298333 }
299334 void ackAll () {
300335 event->ackAll ();
301336 }
302337
303- JSG_RESOURCE_TYPE (QueueController) {
338+ JSG_RESOURCE_TYPE (QueueController, CompatibilityFlags::Reader flags ) {
304339 JSG_READONLY_INSTANCE_PROPERTY (messages, getMessages);
305340 JSG_READONLY_INSTANCE_PROPERTY (queue, getQueueName);
306341
342+ if (flags.getQueueBatchMetadata ()) {
343+ JSG_READONLY_INSTANCE_PROPERTY (metadata, getMetadata);
344+ }
345+
307346 JSG_METHOD (retryAll);
308347 JSG_METHOD (ackAll);
309348
310349 JSG_TS_ROOT ();
311- JSG_TS_OVERRIDE (MessageBatch<Body = unknown> {
312- readonly messages: readonly Message<Body>[];
313- });
350+ if (flags.getQueueBatchMetadata ()) {
351+ JSG_TS_OVERRIDE (MessageBatch<Body = unknown> {
352+ readonly messages: readonly Message<Body>[];
353+ readonly metadata: MessageBatchMetadata;
354+ });
355+ } else {
356+ JSG_TS_OVERRIDE (MessageBatch<Body = unknown> {
357+ readonly messages: readonly Message<Body>[];
358+ });
359+ }
314360 }
315361
316362 void visitForMemoryInfo (jsg::MemoryTracker& tracker) const {
@@ -377,8 +423,9 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re
377423
378424#define EW_QUEUE_ISOLATE_TYPES \
379425 api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
380- api::WorkerQueue::MessageSendRequest, api::IncomingQueueMessage, api::QueueRetryBatch, \
381- api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
382- api::QueueEvent, api::QueueController, api::QueueExportedHandler
426+ api::WorkerQueue::MessageSendRequest, api::MessageBatchMetrics, api::MessageBatchMetadata, \
427+ api::IncomingQueueMessage, api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, \
428+ api::QueueRetryOptions, api::QueueMessage, api::QueueEvent, api::QueueController, \
429+ api::QueueExportedHandler
383430
384431} // namespace workerd::api
0 commit comments