Skip to content

[mqtt] Add portable MqttIO Read/Write transforms for batch and streaming (revives #32385)#38493

Merged
Abacn merged 3 commits into
apache:masterfrom
tkaymak:mqtt-xlang-schematransform
Jun 13, 2026
Merged

[mqtt] Add portable MqttIO Read/Write transforms for batch and streaming (revives #32385)#38493
Abacn merged 3 commits into
apache:masterfrom
tkaymak:mqtt-xlang-schematransform

Conversation

@tkaymak

@tkaymak tkaymak commented May 13, 2026

Copy link
Copy Markdown
Contributor

What

Revives the approved diff from PR #32385 (Add portable Mqtt source and sink transforms) and wires the new SchemaTransforms into Python cross-language wrapper generation, for both batch and streaming.

After this lands, Python users can do:

from apache_beam.io import ReadFromMqtt, WriteToMqtt

and reach MqttIO over xlang, the same way ReadFromKafka / WriteToKafka work today.

How

Two commits:

  1. [mqtt] Add SchemaTransform providers for MqttIO Read/Write

    • Decorate MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) + @SchemaFieldDescription so it round-trips through Beam Schemas.
    • New MqttReadSchemaTransformProvider (beam:schematransform:org.apache.beam:mqtt_read:v1) and MqttWriteSchemaTransformProvider (beam:schematransform:org.apache.beam:mqtt_write:v1), both @AutoService-registered, each with a description() documenting batch vs. streaming behaviour.
    • Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip via an embedded ActiveMQ broker.
    • Pull :sdks:java:io:mqtt into :sdks:java:io:expansion-service so the providers are discoverable by ExpansionService.
  2. [mqtt] Wire MqttIO into Python xlang wrapper generation

    • Add name overrides in sdks/standard_expansion_services.yaml so the generated wrappers use kafka-style naming (ReadFromMqtt / WriteToMqtt).
    • Regenerate sdks/standard_external_transforms.yaml via :sdks:python:generateExternalTransformsConfig.
    • Add an I/Os entry to CHANGES.md for 2.75.0.

Batch and streaming

The read SchemaTransform is unbounded by default: omitting maxNumRecords/maxReadTimeSeconds produces a streaming (unbounded) PCollection; setting either bounds it to a batch read. The write SchemaTransform works with both bounded and unbounded inputs.

The streaming-mode failure that @twosom flagged on the original PR (commentbatch worked, streaming did not) was root-caused: it is a limitation of the legacy local Python DirectRunner, which does not execute portable streaming cross-language unbounded reads — not a defect in MqttIO. On a portable streaming runner the unbounded read/write work end to end.

Verification

Built the io expansion-service shadowJar + a matching 2.75.0.dev Java SDK harness image + the Python SDK from source, and ran against a local Mosquitto broker:

Scenario DirectRunner PrismRunner
Read, batch (max_read_time_seconds)
Read, streaming (unbounded) ❌ hangs (runner limitation) ✅ continuous
Write, batch
Write, streaming (PeriodicImpulse)

The provider description() documents that streaming requires a portable streaming runner (Prism, Flink, Dataflow).

Java unit tests (:sdks:java:io:mqtt:check) pass on both Java 11 and Java 17 (-PtestJavaVersion=17).

Notes vs. PR #32385

  • Generic-typing fix for the post-Add support for Read with Meatadata in MqttIO #32668 API: MqttIO.Read<byte[]> / MqttIO.Write<byte[]> instead of the raw types in the original PR.
  • Naming: ReadFromMqtt / WriteToMqtt (kafka-style), per @Abacn's roadmap comment about onboarding through standard_expansion_services.yaml.
  • topic is now Optional in the generated schema because MqttIO.ConnectionConfiguration#getTopic() was made @Nullable by PR #32668 (readWithMetadata).

Credits

Original work by @ahmedabu98 and @twosom on PR #32385; @damondouglas approved that PR before it went stale and auto-closed on 2025-10-14. This revives that change, adds streaming support + tests, and the small adjustments above.

Closes the gap from #32385 / addresses #21060 (Python MQTT IO).

@tkaymak tkaymak force-pushed the mqtt-xlang-schematransform branch from c394159 to c272fde Compare June 10, 2026 18:35
@tkaymak tkaymak changed the title [mqtt] Add portable MqttIO Read/Write transforms (revives #32385) [mqtt] Add portable MqttIO Read/Write transforms for batch and streaming (revives #32385) Jun 10, 2026
@tkaymak tkaymak marked this pull request as ready for review June 10, 2026 18:36
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request revives and enhances the MQTT IO support by introducing portable SchemaTransforms for both reading and writing. By registering these transforms in the expansion service and updating the Python wrapper generation, it allows users to perform batch and streaming MQTT operations seamlessly across language boundaries. The changes also include necessary schema annotations and validation to ensure robust cross-language interoperability.

Highlights

  • SchemaTransform Providers: Implemented MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider to enable cross-language support for MQTT IO.
  • Python SDK Integration: Wired the new MQTT transforms into the Python SDK via cross-language wrapper generation, enabling ReadFromMqtt and WriteToMqtt.
  • Schema Annotations: Decorated MqttIO.ConnectionConfiguration with AutoValueSchema and field descriptions to ensure proper round-tripping through Beam Schemas.
  • Testing: Added comprehensive unit tests covering batch and streaming scenarios, including verification with an embedded ActiveMQ broker.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces portable ReadFromMqtt and WriteToMqtt schema transforms for MQTT IO, enabling cross-language usage in Python. The changes include adding the read and write schema transform providers, updating configuration classes with schema annotations, registering the new transforms, and adding comprehensive unit tests. The review feedback highlights a few areas for improvement: correcting a placeholder typo (% instead of %s) in a precondition check, renaming a generic parameter foo to connectionConfiguration for better readability, and replacing System.out.println with the class logger in the test code.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

@tkaymak tkaymak force-pushed the mqtt-xlang-schematransform branch 2 times, most recently from 0a415ec to c7d09d0 Compare June 10, 2026 18:48
@tkaymak

tkaymak commented Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @gemini-code-assist, all three are fair, applied in c7d09d04a96:

  1. %%s in the checkState message in MqttReadSchemaTransformProvider - genuine bug, the key set wouldn't have rendered.
  2. fooconnectionConfiguration on the write builder setter - agreed, leftover from the revived Add portable Mqtt source and sink transforms #32385 diff.
  3. System.out.printlnLOG.info with parameterized formatting in the test.

(Folded into the existing two commits to keep the history tidy rather than adding a fixup commit.)

@gemini-code-assist

Copy link
Copy Markdown
Contributor

That's great to hear, @tkaymak! Thank you for confirming those fixes and for keeping the commit history clean. The changes look solid, and the addition of the portable MQTT IO transforms will be a valuable asset for the Beam community. Since you've addressed the feedback, this looks ready to move forward.

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761
implementation project(":sdks:java:extensions:kafka-factories")
permitUnusedDeclared project(":sdks:java:extensions:kafka-factories")
implementation project(":sdks:java:io:mqtt")

@Abacn Abacn Jun 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also discussed with @chamikaramj we recommend put mqttio into a new expansion service module,

I'm considering either in io/mqtt/expansion-service or a bit generic io/messagin-expansion-service that serving messaging protocols (mqtt, and possibly jms, solace in the future)

@tkaymak tkaymak Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointers @Abacn! I went with the broader option: a new :sdks:java:io:messaging-expansion-service module (rather than mqtt-specific), so JMS/Solace can be added to it later without another module or a rename.

  • MqttIO is no longer added to :sdks:java:io:expansion-service (that change is reverted).
  • The new module pulls in only :sdks:java:expansion-service + :sdks:java:io:mqtt
  • standard_expansion_services.yaml now has a second entry for the messaging service emitting ReadFromMqtt/WriteToMqtt; it skips the core SchemaTransforms it bundles transitively, which keep being generated from the io service. Regenerated standard_external_transforms.yaml accordingly, and registered the new target in generateExternalTransformsConfig and the xlang servicesToGenerateFrom list.

Verified the new service end-to-end over xlang (Python → ReadFromMqtt, batch + streaming on Prism) against a local broker.

@tkaymak tkaymak force-pushed the mqtt-xlang-schematransform branch from c7d09d0 to 2204a9d Compare June 11, 2026 07:07

@Abacn Abacn left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

The next step would be setup a PostCommit for Xlang_Messaging, like what we did for Xlang_GCP and Xlang_IO:

https://github.com/apache/beam/blob/master/.github/workflows/beam_PostCommit_Python_Xlang_Gcp_Direct.yml

https://github.com/apache/beam/blob/master/.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml

Direct runner should be sufficient so that we can use testcontainers. Running on Dataflow we'd need to deploy a remote mqtt server and could be out-of-scope for now.

attributes(["Multi-Release": true])
}
mergeServiceFiles()
outputs.upToDateWhen { false }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputs.upToDateWhen { false }

This line was a workaround for corrupted gradle cache. It should have been removed from io/expansion-service.build.gradle as well and we don't need it here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 9b0956f6099 — from this module and from io/expansion-service/build.gradle as well (separate small commit, since that module is otherwise untouched by this PR).

Comment thread CHANGES.md Outdated
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* MqttIO: portable `ReadFromMqtt` / `WriteToMqtt` transforms now available in Python via cross-language (`apache_beam.io`), for both batch and streaming ([#32385](https://github.com/apache/beam/issues/32385)).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As usual let's defer the announcement until tests have setup and are green

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense — removed the CHANGES.md entry in 9b0956f6099. I'll re-add the announcement in the follow-up PR that sets up the Xlang Messaging PostCommit, once it's green.

tkaymak added 3 commits June 12, 2026 12:54
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
so MqttIO can be used through the portable SchemaTransform API and exposed
as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
@DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
config round-trips through Beam Schemas.

Both batch and streaming are supported on the read side: omitting
maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
while setting either bounds it to a batch read. The provider descriptions
document this and note that streaming requires a portable streaming runner
(e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
not execute portable streaming cross-language reads.

Tests cover read-with-timeout-no-data, an unbounded streaming read
(publish/collect/cancel), and a write-then-read round trip against an
embedded ActiveMQ broker.

Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts
it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
Adds a new :sdks:java:io:messaging-expansion-service module that serves
messaging IOs (MQTT for now, with room for JMS/Solace later) instead of
adding MqttIO to the shared :sdks:java:io:expansion-service, per review
feedback from @Abacn and @chamikaramj.

Registers MqttIO's SchemaTransforms in standard_expansion_services.yaml
under the new service with kafka-style names (ReadFromMqtt / WriteToMqtt),
skipping the core SchemaTransforms it bundles transitively (those are
generated from the Java IO expansion service). Regenerates
standard_external_transforms.yaml so the generated Python wrappers are
served by the messaging expansion service, and registers the new target in
the generateExternalTransformsConfig task and the xlang wrapper-validation
list.

The CHANGES.md announcement is deferred to the follow-up PR that sets up
the Xlang Messaging PostCommit, per review feedback.
outputs.upToDateWhen { false } in the shadowJar block was a workaround for
a corrupted gradle cache and is no longer needed (review feedback on
PR apache#38493).
@tkaymak tkaymak force-pushed the mqtt-xlang-schematransform branch from 9dfa6d8 to 9b0956f Compare June 12, 2026 12:57
@tkaymak

tkaymak commented Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for the review @Abacn! Both comments addressed (workaround line removed in both expansion-service modules and CHANGES.md entry deferred).

I'll follow up with a separate PR that sets up beam_PostCommit_Python_Xlang_Messaging_Direct modeled on the Xlang IO/GCP Direct workflows as suggested.
Direct runner with an MQTT broker via testcontainers, plus a Python cross-language IT (WriteToMqtt -> bounded ReadFromMqtt), and add the CHANGES.md announcement once that's green.
Agreed that Dataflow is out of scope for now, I would be open to pick that up as well later.

tkaymak added a commit to tkaymak/beam that referenced this pull request Jun 12, 2026
Sets up beam_PostCommit_Python_Xlang_Messaging_Direct, modeled on the
Xlang IO/GCP Direct PostCommits, to exercise the cross-language MQTT
transforms served by the messaging expansion service:

- New workflow + trigger file + workflows README entry.
- New CrossLanguageTask "messagingCrossLanguage" (collect marker
  uses_messaging_java_expansion_service) wired into xlangTasks, and a
  messagingCrossLanguagePostCommit aggregator task.
- New integration test xlang_mqttio_it_test.py: starts an Eclipse
  Mosquitto broker via testcontainers and covers
  - bounded ReadFromMqtt on the DirectRunner (max_num_records, with a
    continuous mosquitto_pub publisher inside the container since MQTT
    has no retention),
  - WriteToMqtt on the DirectRunner (verified with a mosquitto_sub
    subscriber),
  - the unbounded (streaming) path on the Prism runner: streaming
    ReadFromMqtt feeding WriteToMqtt on a second topic, observed via
    mosquitto_sub and then cancelled.
  The tests skip on Dataflow, where a testcontainers broker would not be
  reachable from the workers (a Dataflow variant would need a remotely
  hosted MQTT broker, like the hosted Kafka cluster used by the Xlang IO
  Dataflow suite).
- Re-adds the CHANGES.md announcement that was deferred from apache#38493.

Per review feedback from @Abacn on apache#38493.
@tkaymak

tkaymak commented Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Run Java_Amazon-Web-Services2_IO_Direct PreCommit

@tkaymak

tkaymak commented Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Status of the failing checks (none related to this PR):

The xlang check that validates this PR (beam_PreCommit_Xlang_Generated_Transforms) is green.
Will have a look at the failing tests to see if I can help fixing them

@Abacn Abacn merged commit 001093e into apache:master Jun 13, 2026
121 of 127 checks passed
@Abacn

Abacn commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants