diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 32894b978094..2a2e2529af38 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -72,7 +72,6 @@ shadowJar { attributes(["Multi-Release": true]) } mergeServiceFiles() - outputs.upToDateWhen { false } } description = "Apache Beam :: SDKs :: Java :: IO :: Expansion Service" diff --git a/sdks/java/io/messaging-expansion-service/build.gradle b/sdks/java/io/messaging-expansion-service/build.gradle new file mode 100644 index 000000000000..6cdf67b86d6e --- /dev/null +++ b/sdks/java/io/messaging-expansion-service/build.gradle @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.messaging.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +shadowJar { + manifest { + attributes(["Multi-Release": true]) + } + mergeServiceFiles() +} + +description = "Apache Beam :: SDKs :: Java :: IO :: Messaging Expansion Service" +ext.summary = "Expansion service serving messaging IOs (e.g. MQTT)" + +dependencies { + implementation project(":sdks:java:expansion-service") + permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 + implementation project(":sdks:java:io:mqtt") + permitUnusedDeclared project(":sdks:java:io:mqtt") // BEAM-11761 + runtimeOnly library.java.slf4j_jdk14 +} + +task runExpansionService (type: JavaExec) { + mainClass = "org.apache.beam.sdk.expansion.service.ExpansionService" + classpath = sourceSets.test.runtimeClasspath + args = [project.findProperty("constructionService.port") ?: "8097"] +} diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 78876eb6534d..72449c0697ae 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -37,7 +37,10 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -205,13 +208,17 @@ public static Write dynamicWrite() { private MqttIO() {} /** A POJO describing a MQTT connection. */ + @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class ConnectionConfiguration implements Serializable { + @SchemaFieldDescription("The MQTT broker URI.") abstract String getServerUri(); + @SchemaFieldDescription("The MQTT topic pattern.") abstract @Nullable String getTopic(); + @SchemaFieldDescription("The client ID prefix, which is used to construct a unique client ID.") abstract @Nullable String getClientId(); abstract @Nullable String getUsername(); diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java new file mode 100644 index 000000000000..b83d9bba9f48 --- /dev/null +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.joda.time.Duration; + +@AutoService(SchemaTransformProvider.class) +public class MqttReadSchemaTransformProvider + extends TypedSchemaTransformProvider { + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class ReadConfiguration implements Serializable { + public static Builder builder() { + return new AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder(); + } + + @SchemaFieldDescription("Configuration options to set up the MQTT connection.") + public abstract ConnectionConfiguration getConnectionConfiguration(); + + @SchemaFieldDescription( + "The max number of records to receive. Setting this will result in a bounded PCollection.") + @Nullable + public abstract Long getMaxNumRecords(); + + @SchemaFieldDescription( + "The maximum time for this source to read messages. Setting this will result in a bounded PCollection.") + @Nullable + public abstract Long getMaxReadTimeSeconds(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setConnectionConfiguration( + ConnectionConfiguration connectionConfiguration); + + public abstract Builder setMaxNumRecords(Long maxNumRecords); + + public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds); + + public abstract ReadConfiguration build(); + } + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mqtt_read:v1"; + } + + @Override + public String description() { + return "Reads messages from an MQTT broker and outputs each payload as a single `bytes` " + + "field.\n" + + "\n" + + "By default the read is unbounded (streaming): it keeps consuming messages from the " + + "subscribed topic until the pipeline is stopped. Setting `maxNumRecords` and/or " + + "`maxReadTimeSeconds` bounds the read, producing a bounded (batch) PCollection.\n" + + "\n" + + "Note: streaming reads require a runner that supports portable streaming (e.g. Prism, " + + "Flink, or Dataflow). The legacy local Python DirectRunner does not execute portable " + + "streaming cross-language reads."; + } + + @Override + protected SchemaTransform from(ReadConfiguration configuration) { + return new MqttReadSchemaTransform(configuration); + } + + private static class MqttReadSchemaTransform extends SchemaTransform { + private final ReadConfiguration config; + + MqttReadSchemaTransform(ReadConfiguration configuration) { + this.config = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Preconditions.checkState( + input.getAll().isEmpty(), + "Expected zero input PCollections for this source, but found: %s", + input.getAll().keySet()); + + MqttIO.Read readTransform = + MqttIO.read().withConnectionConfiguration(config.getConnectionConfiguration()); + + Long maxRecords = config.getMaxNumRecords(); + Long maxReadTime = config.getMaxReadTimeSeconds(); + if (maxRecords != null) { + readTransform = readTransform.withMaxNumRecords(maxRecords); + } + if (maxReadTime != null) { + readTransform = readTransform.withMaxReadTime(Duration.standardSeconds(maxReadTime)); + } + + Schema outputSchema = Schema.builder().addByteArrayField("bytes").build(); + + PCollection outputRows = + input + .getPipeline() + .apply(readTransform) + .apply( + "Wrap in Beam Rows", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element byte[] data, OutputReceiver outputReceiver) { + outputReceiver.output( + Row.withSchema(outputSchema).addValue(data).build()); + } + })) + .setRowSchema(outputSchema); + + return PCollectionRowTuple.of("output", outputRows); + } + } +} diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java new file mode 100644 index 000000000000..95ee00c8c3a2 --- /dev/null +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +@AutoService(SchemaTransformProvider.class) +public class MqttWriteSchemaTransformProvider + extends TypedSchemaTransformProvider { + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class WriteConfiguration implements Serializable { + public static Builder builder() { + return new AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder(); + } + + @SchemaFieldDescription("Configuration options to set up the MQTT connection.") + public abstract ConnectionConfiguration getConnectionConfiguration(); + + @SchemaFieldDescription( + "Whether or not the publish message should be retained by the messaging engine. " + + "When a subscriber connects, it gets the latest retained message. " + + "Defaults to `False`, which will clear the retained message from the server.") + @Nullable + public abstract Boolean getRetained(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setConnectionConfiguration( + ConnectionConfiguration connectionConfiguration); + + public abstract Builder setRetained(Boolean retained); + + public abstract WriteConfiguration build(); + } + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mqtt_write:v1"; + } + + @Override + public String description() { + return "Publishes messages to an MQTT broker. Expects an input PCollection of rows with a " + + "single `bytes` field, each of which is published as one MQTT message.\n" + + "\n" + + "Works with both bounded (batch) and unbounded (streaming) input PCollections."; + } + + @Override + protected SchemaTransform from(WriteConfiguration configuration) { + return new MqttWriteSchemaTransform(configuration); + } + + private static class MqttWriteSchemaTransform extends SchemaTransform { + private final WriteConfiguration config; + + MqttWriteSchemaTransform(WriteConfiguration configuration) { + this.config = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection inputRows = input.getSinglePCollection(); + + Preconditions.checkState( + inputRows.getSchema().getFieldCount() == 1 + && inputRows.getSchema().getField(0).getType().equals(Schema.FieldType.BYTES), + "Expected only one Schema field containing bytes, but instead received: %s", + inputRows.getSchema()); + + MqttIO.Write writeTransform = + MqttIO.write().withConnectionConfiguration(config.getConnectionConfiguration()); + Boolean retained = config.getRetained(); + if (retained != null) { + writeTransform = writeTransform.withRetained(retained); + } + + inputRows + .apply( + "Extract bytes", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element Row row, OutputReceiver outputReceiver) { + outputReceiver.output( + org.apache.beam.sdk.util.Preconditions.checkStateNotNull( + row.getBytes(0))); + } + })) + .apply(writeTransform); + + return PCollectionRowTuple.empty(inputRows.getPipeline()); + } + } +} diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java new file mode 100644 index 000000000000..60bdd1104dbe --- /dev/null +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import org.apache.activemq.broker.BrokerService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.common.NetworkTestHelper; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttSchemaTransformProviderTest { + private static final Logger LOG = LoggerFactory.getLogger(MqttSchemaTransformProviderTest.class); + + private BrokerService brokerService; + + private int port; + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Before + public void startBroker() throws Exception { + port = NetworkTestHelper.getAvailableLocalPort(); + LOG.info("Starting ActiveMQ brokerService on {}", port); + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + // use memory persistence for the test: it's faster and don't pollute test folder with KahaDB + brokerService.setPersistent(false); + brokerService.addConnector("mqtt://localhost:" + port); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @Test(timeout = 30 * 1000) + public void testReceiveWithTimeoutAndNoData() { + MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration = + MqttReadSchemaTransformProvider.ReadConfiguration.builder() + .setConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC") + .withClientId("READ_PIPELINE")) + .setMaxReadTimeSeconds(2L) + .build(); + + PCollectionRowTuple.empty(pipeline) + .apply(new MqttReadSchemaTransformProvider().from(readConfiguration)); + + // should stop before the test timeout + pipeline.run().waitUntilFinish(); + } + + /** Collects the bytes field of every output row into a shared queue (DirectRunner is in-JVM). */ + private static final ConcurrentLinkedQueue STREAMING_RECEIVED = + new ConcurrentLinkedQueue<>(); + + private static class CollectBytesFn extends DoFn { + @ProcessElement + public void processElement(@Element Row row) { + byte[] bytes = row.getBytes("bytes"); + if (bytes != null) { + STREAMING_RECEIVED.add(new String(bytes, StandardCharsets.UTF_8)); + } + } + } + + /** + * Reads in streaming mode: when neither {@code maxNumRecords} nor {@code maxReadTimeSeconds} is + * set the SchemaTransform produces an unbounded PCollection. Verifies that messages published + * after the reader subscribes flow through continuously, then cancels the running pipeline. + */ + @Test(timeout = 90 * 1000) + public void testReadUnboundedStreaming() throws Exception { + STREAMING_RECEIVED.clear(); + final String topicName = "STREAM_READ_TOPIC"; + + // No bound -> unbounded (streaming) read. + MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration = + MqttReadSchemaTransformProvider.ReadConfiguration.builder() + .setConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, topicName) + .withClientId("STREAM_READ_PIPELINE")) + .build(); + + // Use a local pipeline so run() does not block (the read never terminates on its own). + PipelineOptions options = + PipelineOptionsFactory.fromArgs("--blockOnRun=false").withoutStrictParsing().create(); + Pipeline p = Pipeline.create(options); + PCollectionRowTuple.empty(p) + .apply(new MqttReadSchemaTransformProvider().from(readConfiguration)) + .get("output") + .apply(ParDo.of(new CollectBytesFn())); + + // Publish a steady stream of messages until the reader has consumed enough. + final boolean[] keepPublishing = {true}; + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection publishConnection = client.blockingConnection(); + publishConnection.connect(); + Thread publisher = + new Thread( + () -> { + int i = 0; + try { + while (keepPublishing[0]) { + publishConnection.publish( + topicName, + ("stream-" + i).getBytes(StandardCharsets.UTF_8), + QoS.AT_LEAST_ONCE, + false); + i++; + Thread.sleep(200); + } + } catch (Exception e) { + // ignore: connection closed on teardown + } + }); + + PipelineResult result = p.run(); + publisher.start(); + + // Wait until the unbounded read delivers a meaningful number of records. + int expected = 10; + long deadline = System.currentTimeMillis() + 60 * 1000; + while (STREAMING_RECEIVED.size() < expected && System.currentTimeMillis() < deadline) { + Thread.sleep(500); + } + + keepPublishing[0] = false; + publisher.join(); + publishConnection.disconnect(); + result.cancel(); + + assertTrue( + "expected at least " + expected + " streamed records, got " + STREAMING_RECEIVED.size(), + STREAMING_RECEIVED.size() >= expected); + for (String received : STREAMING_RECEIVED) { + assertTrue("unexpected payload: " + received, received.startsWith("stream-")); + } + } + + @Test + public void testWrite() throws Exception { + final int numberOfTestMessages = 200; + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection connection = client.blockingConnection(); + connection.connect(); + connection.subscribe(new Topic[] {new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.EXACTLY_ONCE)}); + + final Set messages = new ConcurrentSkipListSet<>(); + + Thread subscriber = + new Thread( + () -> { + try { + for (int i = 0; i < numberOfTestMessages; i++) { + Message message = connection.receive(); + messages.add(new String(message.getPayload(), StandardCharsets.UTF_8)); + message.ack(); + LOG.info("message: {}", new String(message.getPayload(), StandardCharsets.UTF_8)); + } + } catch (Exception e) { + LOG.error("Can't receive message", e); + } + }); + subscriber.start(); + + ArrayList data = new ArrayList<>(); + for (int i = 0; i < numberOfTestMessages; i++) { + data.add(("Test " + i).getBytes(StandardCharsets.UTF_8)); + } + + MqttWriteSchemaTransformProvider.WriteConfiguration writeConfiguration = + MqttWriteSchemaTransformProvider.WriteConfiguration.builder() + .setConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "WRITE_TOPIC")) + .build(); + Schema dataSchema = Schema.builder().addByteArrayField("bytes").build(); + + PCollection inputRows = + pipeline + .apply(Create.of(data)) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via(d -> Row.withSchema(dataSchema).addValue(d).build())) + .setRowSchema(dataSchema); + PCollectionRowTuple.of("input", inputRows) + .apply(new MqttWriteSchemaTransformProvider().from(writeConfiguration)); + pipeline.run(); + subscriber.join(); + + connection.disconnect(); + + assertEquals(numberOfTestMessages, messages.size()); + for (int i = 0; i < numberOfTestMessages; i++) { + assertTrue(messages.contains("Test " + i)); + } + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + brokerService = null; + } + } +} diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index b39b12f198e9..9e2fe232c42c 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -73,6 +73,7 @@ tasks.register("generateExternalTransformsConfig") { // Need to build all expansion services listed in sdks/standard_expansion_services.yaml dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build" dependsOn ":sdks:java:io:expansion-service:build" + dependsOn ":sdks:java:io:messaging-expansion-service:build" // Keep this in-sync with pyproject.toml def PyYaml = "'pyyaml>=3.12,<7.0.0'" diff --git a/sdks/python/test-suites/xlang/build.gradle b/sdks/python/test-suites/xlang/build.gradle index 3065ad8377e3..1cbbaa0db534 100644 --- a/sdks/python/test-suites/xlang/build.gradle +++ b/sdks/python/test-suites/xlang/build.gradle @@ -25,6 +25,7 @@ project.evaluationDependsOn(":sdks:python") // relevant fields as done here, then add it to `xlangTasks`. def gcpExpansionPath = project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath() def ioExpansionPath = project.project(':sdks:java:io:expansion-service').getPath() +def messagingExpansionPath = project.project(':sdks:java:io:messaging-expansion-service').getPath() // Properties that are common across runners. // Used to launch the expansion service, collect the right tests, and cleanup afterwards def gcpXlang = new CrossLanguageTask().tap { @@ -42,7 +43,7 @@ def ioXlang = new CrossLanguageTask().tap { } // This list should include all expansion service targets in sdks/python/standard_expansion_services.yaml -def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath] +def servicesToGenerateFrom = [ioExpansionPath, messagingExpansionPath, gcpExpansionPath] def xlangWrapperValidation = new CrossLanguageTask().tap { name = "xlangWrapperValidation" expansionProjectPaths = servicesToGenerateFrom diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml index 531caca5a376..79c7e06280df 100644 --- a/sdks/standard_expansion_services.yaml +++ b/sdks/standard_expansion_services.yaml @@ -53,6 +53,21 @@ - 'beam:schematransform:org.apache.beam:iceberg_read:v1' - 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' +- gradle_target: 'sdks:java:io:messaging-expansion-service:shadowJar' + destinations: + python: 'apache_beam/io' + transforms: + 'beam:schematransform:org.apache.beam:mqtt_write:v1': + name: 'WriteToMqtt' + 'beam:schematransform:org.apache.beam:mqtt_read:v1': + name: 'ReadFromMqtt' + skip_transforms: + # Core SchemaTransforms bundled via :sdks:java:expansion-service; already + # generated from the Java IO expansion service above. + - 'beam:schematransform:org.apache.beam:generate_sequence:v1' + - 'beam:schematransform:org.apache.beam:tfrecord_read:v1' + - 'beam:schematransform:org.apache.beam:tfrecord_write:v1' + # TODO(ahmedabu98): Enable this service in a future PR #- gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' # destinations: diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 057c4e3f47d1..f26438eb85ea 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,7 +19,7 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2026-05-06 +# Last updated on: 2026-06-11 - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 @@ -145,3 +145,58 @@ type: str identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1 name: TfrecordWrite +- default_service: sdks:java:io:messaging-expansion-service:shadowJar + description: 'Reads messages from an MQTT broker and outputs each payload as a single + `bytes` field. + + + By default the read is unbounded (streaming): it keeps consuming messages from + the subscribed topic until the pipeline is stopped. Setting `maxNumRecords` and/or + `maxReadTimeSeconds` bounds the read, producing a bounded (batch) PCollection. + + + Note: streaming reads require a runner that supports portable streaming (e.g. + Prism, Flink, or Dataflow). The legacy local Python DirectRunner does not execute + portable streaming cross-language reads.' + destinations: + python: apache_beam/io + fields: + - description: Configuration options to set up the MQTT connection. + name: connection_configuration + nullable: false + type: Row(client_id=typing.Optional[str], password=typing.Optional[str], server_uri=, topic=typing.Optional[str], username=typing.Optional[str]) + - description: The max number of records to receive. Setting this will result in + a bounded PCollection. + name: max_num_records + nullable: true + type: int64 + - description: The maximum time for this source to read messages. Setting this will + result in a bounded PCollection. + name: max_read_time_seconds + nullable: true + type: int64 + identifier: beam:schematransform:org.apache.beam:mqtt_read:v1 + name: ReadFromMqtt +- default_service: sdks:java:io:messaging-expansion-service:shadowJar + description: 'Publishes messages to an MQTT broker. Expects an input PCollection + of rows with a single `bytes` field, each of which is published as one MQTT message. + + + Works with both bounded (batch) and unbounded (streaming) input PCollections.' + destinations: + python: apache_beam/io + fields: + - description: Configuration options to set up the MQTT connection. + name: connection_configuration + nullable: false + type: Row(client_id=typing.Optional[str], password=typing.Optional[str], server_uri=, topic=typing.Optional[str], username=typing.Optional[str]) + - description: Whether or not the publish message should be retained by the messaging + engine. When a subscriber connects, it gets the latest retained message. Defaults + to `False`, which will clear the retained message from the server. + name: retained + nullable: true + type: boolean + identifier: beam:schematransform:org.apache.beam:mqtt_write:v1 + name: WriteToMqtt diff --git a/settings.gradle.kts b/settings.gradle.kts index 443d9c567752..3d4346661a47 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -236,6 +236,7 @@ include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-8") include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-9") include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common") include(":sdks:java:io:expansion-service") +include(":sdks:java:io:messaging-expansion-service") include(":sdks:java:io:file-based-io-tests") include(":sdks:java:io:bigquery-io-perf-tests") include(":sdks:java:io:cdap")