Skip to content

MQ-1200 Add response body to queue send() and sendBatch()#6354

Merged
jasnell merged 1 commit intocloudflare:mainfrom
KennethRuan:kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch
Mar 27, 2026
Merged

MQ-1200 Add response body to queue send() and sendBatch()#6354
jasnell merged 1 commit intocloudflare:mainfrom
KennethRuan:kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch

Conversation

@KennethRuan
Copy link
Copy Markdown
Contributor

@KennethRuan KennethRuan commented Mar 18, 2026

Summary

Adds support for returning structured JSON responses from the send() and sendBatch() methods from a worker's env.QUEUE binding. The changes are gated behind the queue_send_response_body/no_queue_send_response_body flag.

Changes

Depending on the experimental queue_send_response_body flag, either Promise<void> or Promise<QueueSendResponse> will be returned by the send methods. In order to support both the old and new return types, the functions had to be duplicated.

jsg::Promise<WorkerQueue::SendResponse> WorkerQueue::sendWithResponse(jsg::Lock& js,
    jsg::JsValue body,
    jsg::Optional<SendOptions> options,
    const jsg::TypeHandler<SendResponse>& responseHandler) {

  ...

  static constexpr auto handleSend = [](auto req, auto serialized, auto client, auto& headerIds,
                                         bool exposeErrorCodes) -> kj::Promise<kj::String> {
    ...

    auto responseBody = co_await response.body->readAllBytes();
    co_return kj::str(responseBody.asChars());
  };

  auto promise =
      handleSend(kj::mv(req), kj::mv(serialized), kj::mv(client), headerIds, exposeErrorCodes);

  return context.awaitIo(
      js, kj::mv(promise), [&responseHandler](jsg::Lock& js, kj::String text) -> SendResponse {
    auto parsed = jsg::JsValue::fromJson(js, text);
    KJ_IF_SOME(result, responseHandler.tryUnwrap(js, parsed)) {
      return kj::mv(result);
    }
    _JSG_INTERNAL_FAIL_REQUIRE(JSG_EXCEPTION(Error), "Failed to parse queue send response", text);
  });
}

On the Typescript side, these new functions are exposed with the same name.

  JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) {
    if (flags.getQueueSendResponseBody()) {
      JSG_METHOD_NAMED(send, sendWithResponse);
      JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse);
    } else {
      JSG_METHOD(send);
      JSG_METHOD(sendBatch);
    }

Upstream changes can be found here: https://gitlab.cfdata.org/cloudflare/mq/queue-broker-worker/-/merge_requests/1768#c210bdd061230e9c1f9da3b517fbecabd025c5c4

Testing

  • bazel test //src/workerd/api/tests:queue-test@
  • bazel test //src/workerd/api/tests:queue-test@all-compat-flags
  • bazel test //src/workerd/api/tests:queue-producer-metadata-test@
  • bazel test //src/workerd/api/tests:queue-producer-metadata-test@all-compat-flags

@KennethRuan KennethRuan requested review from a team as code owners March 18, 2026 21:50
jsg::JsValue body,
jsg::Optional<SendOptions> options,
const jsg::TypeHandler<SendResponse>& responseHandler);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, did we decide to put send() / sendBatch() with responses behind a compat flag? To me this change looks non-breaking, and so does not necessarily need to be behind a compat flag. It'd certainly make implementation a lot simpler too?

Copy link
Copy Markdown
Contributor Author

@KennethRuan KennethRuan Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here for posterity:
Based on discussions, we decided to put these changes under the experimental flag. After internal testing, we'll need to put in a PR to remove the experimental flag and also regenerate the production type files. We were imagining all the metrics changes can go in together, so this may be the easier approach for us.

Link to other metrics-related Queues PRs:
#6246
#6339

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 20, 2026

Codecov Report

❌ Patch coverage is 71.03448% with 42 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.78%. Comparing base (a9fc5bb) to head (dda70b4).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
src/workerd/api/queue.c++ 71.03% 27 Missing and 15 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6354      +/-   ##
==========================================
- Coverage   70.79%   70.78%   -0.01%     
==========================================
  Files         427      427              
  Lines      117724   117869     +145     
  Branches    18909    18935      +26     
==========================================
+ Hits        83338    83438     +100     
- Misses      23132    23160      +28     
- Partials    11254    11271      +17     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@KennethRuan KennethRuan force-pushed the kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch branch 2 times, most recently from a88a242 to e7cc945 Compare March 22, 2026 20:34
KJ_IF_SOME(result, responseHandler.tryUnwrap(js, parsed)) {
return kj::mv(result);
}
_JSG_INTERNAL_FAIL_REQUIRE(JSG_EXCEPTION(Error), "Failed to parse queue send response", text);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use JSG_REQUIRE_NONNULL instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated!

return kj::mv(result);
}
_JSG_INTERNAL_FAIL_REQUIRE(
JSG_EXCEPTION(Error), "Failed to parse queue sendBatch response", text);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use JSG_REQUIRE_NONNULL... and just in general, don't use the _-prefixed macros directly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updates as well, thanks! I noticed my changes in the metrics() PR had the same issue, I will submit a follow up PR to clean that up.

@KennethRuan KennethRuan force-pushed the kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch branch from e7cc945 to 648cdb6 Compare March 26, 2026 18:51
@KennethRuan KennethRuan requested a review from a team as a code owner March 26, 2026 18:51
@KennethRuan KennethRuan requested a review from penalosa March 26, 2026 18:51
@KennethRuan KennethRuan force-pushed the kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch branch from 648cdb6 to 22f1aa9 Compare March 26, 2026 19:30
@KennethRuan KennethRuan force-pushed the kruan/MQ-1200-read-response-body-from-QUEUE-send-and-sendbatch branch from 22f1aa9 to dda70b4 Compare March 26, 2026 20:52
@jasnell
Copy link
Copy Markdown
Collaborator

jasnell commented Mar 27, 2026

Manual run of internal-build succeeds. Merging.

@jasnell jasnell merged commit 29e2213 into cloudflare:main Mar 27, 2026
28 of 30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants