[mqtt] Add portable MqttIO Read/Write transforms for batch and streaming (revives #32385)#38493
Conversation
c394159 to
c272fde
Compare
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request revives and enhances the MQTT IO support by introducing portable SchemaTransforms for both reading and writing. By registering these transforms in the expansion service and updating the Python wrapper generation, it allows users to perform batch and streaming MQTT operations seamlessly across language boundaries. The changes also include necessary schema annotations and validation to ensure robust cross-language interoperability. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces portable ReadFromMqtt and WriteToMqtt schema transforms for MQTT IO, enabling cross-language usage in Python. The changes include adding the read and write schema transform providers, updating configuration classes with schema annotations, registering the new transforms, and adding comprehensive unit tests. The review feedback highlights a few areas for improvement: correcting a placeholder typo (% instead of %s) in a precondition check, renaming a generic parameter foo to connectionConfiguration for better readability, and replacing System.out.println with the class logger in the test code.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
0a415ec to
c7d09d0
Compare
|
Thanks @gemini-code-assist, all three are fair, applied in
(Folded into the existing two commits to keep the history tidy rather than adding a fixup commit.) |
|
That's great to hear, @tkaymak! Thank you for confirming those fixes and for keeping the commit history clean. The changes look solid, and the addition of the portable MQTT IO transforms will be a valuable asset for the Beam community. Since you've addressed the feedback, this looks ready to move forward. |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
| permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 | ||
| implementation project(":sdks:java:extensions:kafka-factories") | ||
| permitUnusedDeclared project(":sdks:java:extensions:kafka-factories") | ||
| implementation project(":sdks:java:io:mqtt") |
There was a problem hiding this comment.
Also discussed with @chamikaramj we recommend put mqttio into a new expansion service module,
I'm considering either in io/mqtt/expansion-service or a bit generic io/messagin-expansion-service that serving messaging protocols (mqtt, and possibly jms, solace in the future)
There was a problem hiding this comment.
Thanks for the pointers @Abacn! I went with the broader option: a new :sdks:java:io:messaging-expansion-service module (rather than mqtt-specific), so JMS/Solace can be added to it later without another module or a rename.
- MqttIO is no longer added to
:sdks:java:io:expansion-service(that change is reverted). - The new module pulls in only
:sdks:java:expansion-service+:sdks:java:io:mqtt standard_expansion_services.yamlnow has a second entry for the messaging service emittingReadFromMqtt/WriteToMqtt; it skips the core SchemaTransforms it bundles transitively, which keep being generated from the io service. Regeneratedstandard_external_transforms.yamlaccordingly, and registered the new target ingenerateExternalTransformsConfigand the xlangservicesToGenerateFromlist.
Verified the new service end-to-end over xlang (Python → ReadFromMqtt, batch + streaming on Prism) against a local broker.
c7d09d0 to
2204a9d
Compare
Abacn
left a comment
There was a problem hiding this comment.
Thanks!
The next step would be setup a PostCommit for Xlang_Messaging, like what we did for Xlang_GCP and Xlang_IO:
Direct runner should be sufficient so that we can use testcontainers. Running on Dataflow we'd need to deploy a remote mqtt server and could be out-of-scope for now.
| attributes(["Multi-Release": true]) | ||
| } | ||
| mergeServiceFiles() | ||
| outputs.upToDateWhen { false } |
There was a problem hiding this comment.
outputs.upToDateWhen { false }
This line was a workaround for corrupted gradle cache. It should have been removed from io/expansion-service.build.gradle as well and we don't need it here
There was a problem hiding this comment.
Removed in 9b0956f6099 — from this module and from io/expansion-service/build.gradle as well (separate small commit, since that module is otherwise untouched by this PR).
| ## I/Os | ||
|
|
||
| * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). | ||
| * MqttIO: portable `ReadFromMqtt` / `WriteToMqtt` transforms now available in Python via cross-language (`apache_beam.io`), for both batch and streaming ([#32385](https://github.com/apache/beam/issues/32385)). |
There was a problem hiding this comment.
As usual let's defer the announcement until tests have setup and are green
There was a problem hiding this comment.
Makes sense — removed the CHANGES.md entry in 9b0956f6099. I'll re-add the announcement in the follow-up PR that sets up the Xlang Messaging PostCommit, once it's green.
Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider so MqttIO can be used through the portable SchemaTransform API and exposed as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the config round-trips through Beam Schemas. Both batch and streaming are supported on the read side: omitting maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read, while setting either bounds it to a batch read. The provider descriptions document this and note that streaming requires a portable streaming runner (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does not execute portable streaming cross-language reads. Tests cover read-with-timeout-no-data, an unbounded streaming read (publish/collect/cancel), and a write-then-read round trip against an embedded ActiveMQ broker. Revives the approved diff from PR apache#32385 (ahmedabu98, twosom) and adapts it to the post-apache#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
Adds a new :sdks:java:io:messaging-expansion-service module that serves messaging IOs (MQTT for now, with room for JMS/Solace later) instead of adding MqttIO to the shared :sdks:java:io:expansion-service, per review feedback from @Abacn and @chamikaramj. Registers MqttIO's SchemaTransforms in standard_expansion_services.yaml under the new service with kafka-style names (ReadFromMqtt / WriteToMqtt), skipping the core SchemaTransforms it bundles transitively (those are generated from the Java IO expansion service). Regenerates standard_external_transforms.yaml so the generated Python wrappers are served by the messaging expansion service, and registers the new target in the generateExternalTransformsConfig task and the xlang wrapper-validation list. The CHANGES.md announcement is deferred to the follow-up PR that sets up the Xlang Messaging PostCommit, per review feedback.
outputs.upToDateWhen { false } in the shadowJar block was a workaround for
a corrupted gradle cache and is no longer needed (review feedback on
PR apache#38493).
9dfa6d8 to
9b0956f
Compare
|
Thanks for the review @Abacn! Both comments addressed (workaround line removed in both expansion-service modules and CHANGES.md entry deferred). I'll follow up with a separate PR that sets up |
Sets up beam_PostCommit_Python_Xlang_Messaging_Direct, modeled on the
Xlang IO/GCP Direct PostCommits, to exercise the cross-language MQTT
transforms served by the messaging expansion service:
- New workflow + trigger file + workflows README entry.
- New CrossLanguageTask "messagingCrossLanguage" (collect marker
uses_messaging_java_expansion_service) wired into xlangTasks, and a
messagingCrossLanguagePostCommit aggregator task.
- New integration test xlang_mqttio_it_test.py: starts an Eclipse
Mosquitto broker via testcontainers and covers
- bounded ReadFromMqtt on the DirectRunner (max_num_records, with a
continuous mosquitto_pub publisher inside the container since MQTT
has no retention),
- WriteToMqtt on the DirectRunner (verified with a mosquitto_sub
subscriber),
- the unbounded (streaming) path on the Prism runner: streaming
ReadFromMqtt feeding WriteToMqtt on a second topic, observed via
mosquitto_sub and then cancelled.
The tests skip on Dataflow, where a testcontainers broker would not be
reachable from the workers (a Dataflow variant would need a remotely
hosted MQTT broker, like the hosted Kafka cluster used by the Xlang IO
Dataflow suite).
- Re-adds the CHANGES.md announcement that was deferred from apache#38493.
Per review feedback from @Abacn on apache#38493.
|
Run Java_Amazon-Web-Services2_IO_Direct PreCommit |
|
Status of the failing checks (none related to this PR):
The xlang check that validates this PR ( |
|
Thank you! |
What
Revives the approved diff from PR #32385 (
Add portable Mqtt source and sink transforms) and wires the new SchemaTransforms into Python cross-language wrapper generation, for both batch and streaming.After this lands, Python users can do:
and reach
MqttIOover xlang, the same wayReadFromKafka/WriteToKafkawork today.How
Two commits:
[mqtt] Add SchemaTransform providers for MqttIO Read/WriteMqttIO.ConnectionConfigurationwith@DefaultSchema(AutoValueSchema.class)+@SchemaFieldDescriptionso it round-trips through Beam Schemas.MqttReadSchemaTransformProvider(beam:schematransform:org.apache.beam:mqtt_read:v1) andMqttWriteSchemaTransformProvider(beam:schematransform:org.apache.beam:mqtt_write:v1), both@AutoService-registered, each with adescription()documenting batch vs. streaming behaviour.:sdks:java:io:mqttinto:sdks:java:io:expansion-serviceso the providers are discoverable byExpansionService.[mqtt] Wire MqttIO into Python xlang wrapper generationsdks/standard_expansion_services.yamlso the generated wrappers use kafka-style naming (ReadFromMqtt/WriteToMqtt).sdks/standard_external_transforms.yamlvia:sdks:python:generateExternalTransformsConfig.CHANGES.mdfor 2.75.0.Batch and streaming
The read SchemaTransform is unbounded by default: omitting
maxNumRecords/maxReadTimeSecondsproduces a streaming (unbounded)PCollection; setting either bounds it to a batch read. The write SchemaTransform works with both bounded and unbounded inputs.The streaming-mode failure that @twosom flagged on the original PR (comment — batch worked, streaming did not) was root-caused: it is a limitation of the legacy local Python DirectRunner, which does not execute portable streaming cross-language unbounded reads — not a defect in
MqttIO. On a portable streaming runner the unbounded read/write work end to end.Verification
Built the io expansion-service shadowJar + a matching
2.75.0.devJava SDK harness image + the Python SDK from source, and ran against a local Mosquitto broker:max_read_time_seconds)PeriodicImpulse)The provider
description()documents that streaming requires a portable streaming runner (Prism, Flink, Dataflow).Java unit tests (
:sdks:java:io:mqtt:check) pass on both Java 11 and Java 17 (-PtestJavaVersion=17).Notes vs. PR #32385
MqttIO#32668 API:MqttIO.Read<byte[]>/MqttIO.Write<byte[]>instead of the raw types in the original PR.ReadFromMqtt/WriteToMqtt(kafka-style), per @Abacn's roadmap comment about onboarding throughstandard_expansion_services.yaml.topicis nowOptionalin the generated schema becauseMqttIO.ConnectionConfiguration#getTopic()was made@Nullableby PR #32668 (readWithMetadata).Credits
Original work by @ahmedabu98 and @twosom on PR #32385; @damondouglas approved that PR before it went stale and auto-closed on 2025-10-14. This revives that change, adds streaming support + tests, and the small adjustments above.
Closes the gap from #32385 / addresses #21060 (Python MQTT IO).