diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java
index 2ed75d7bc7e0..a169d0759b86 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java
@@ -68,7 +68,7 @@ public class BigtableReadSchemaTransformProvider
"column_families",
Schema.FieldType.STRING,
Schema.FieldType.map(
- Schema.FieldType.STRING,
+ Schema.FieldType.BYTES,
Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
.build();
public static final Schema FLATTENED_ROW_SCHEMA =
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java
new file mode 100644
index 000000000000..b8e566ed7239
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB.
+ *
+ *
Internal only: This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+ MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> {
+
+ private static final String OUTPUT_TAG = "output";
+
+ @Override
+ protected Class configurationClass() {
+ return MongoDbReadSchemaTransformConfiguration.class;
+ }
+
+ @Override
+ protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) {
+ return new MongoDbReadSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ // Return a unique URN for the transform.
+ return "beam:schematransform:org.apache.beam:mongodb_read:v1";
+ }
+
+ @Override
+ public List inputCollectionNames() {
+ // A read transform does not have an input PCollection.
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List outputCollectionNames() {
+ // The primary output is a PCollection of Rows.
+ // Error handling could be added later with a second "errors" output tag.
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /** Configuration class for the MongoDB Read transform. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable {
+
+ @SchemaFieldDescription("The connection URI for the MongoDB server.")
+ public abstract String getUri();
+
+ @SchemaFieldDescription("The MongoDB database to read from.")
+ public abstract String getDatabase();
+
+ @SchemaFieldDescription("The MongoDB collection to read from.")
+ public abstract String getCollection();
+
+ @SchemaFieldDescription(
+ "An optional BSON filter to apply to the read. This should be a valid JSON string.")
+ @Nullable
+ public abstract String getFilter();
+
+ public void validate() {
+ checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified.");
+ checkArgument(
+ getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified.");
+ checkArgument(
+ getCollection() != null && !getCollection().isEmpty(),
+ "MongoDB collection must be specified.");
+ }
+
+ public static Builder builder() {
+ return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration
+ .Builder();
+ }
+
+ /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setUri(String uri);
+
+ public abstract Builder setDatabase(String database);
+
+ public abstract Builder setCollection(String collection);
+
+ public abstract Builder setFilter(String filter);
+
+ public abstract MongoDbReadSchemaTransformConfiguration build();
+ }
+ }
+
+ /** The {@link SchemaTransform} that performs the read operation. */
+ private static class MongoDbReadSchemaTransform extends SchemaTransform {
+ private final MongoDbReadSchemaTransformConfiguration configuration;
+
+ MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ // A read transform does not have an input, so we start with the pipeline.
+ PCollection mongoDocs =
+ input
+ .getPipeline()
+ .apply(
+ "ReadFromMongoDb",
+ MongoDbIO.read()
+ .withUri(configuration.getUri())
+ .withDatabase(configuration.getDatabase())
+ .withCollection(configuration.getCollection()));
+ // TODO: Add support for .withFilter() if it exists in your MongoDbIO,
+ // using configuration.getFilter().
+
+ // Convert the BSON Document objects into Beam Row objects.
+ PCollection beamRows =
+ mongoDocs.apply("ConvertToBeamRows", ParDo.of(new MongoDocumentToRowFn()));
+
+ return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
+ }
+ }
+
+ /**
+ * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link Row}.
+ *
+ * This is a critical step to ensure data is in a schema-aware format.
+ */
+ private static class MongoDocumentToRowFn extends DoFn {
+ // TODO: Define the Beam Schema that corresponds to your MongoDB documents.
+ // This could be made dynamic based on an inferred schema or a user-provided schema.
+ // For this skeleton, we assume a static schema.
+ // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build();
+
+ @ProcessElement
+ public void processElement(@Element Document doc, OutputReceiver out) {
+ // Here you will convert the BSON document to a Beam Row.
+ // This requires you to know the target schema.
+
+ // Example pseudo-code:
+ // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA);
+ // for (Map.Entry entry : doc.entrySet()) {
+ // rowBuilder.addValue(entry.getValue());
+ // }
+ // out.output(rowBuilder.build());
+
+ // For a robust implementation, you would handle data type conversions
+ // between BSON types and Beam schema types.
+ throw new UnsupportedOperationException(
+ "MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows.");
+ }
+ }
+}
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java
new file mode 100644
index 000000000000..7554489229fb
--- /dev/null
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.bson.Document;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB.
+ *
+ * Internal only: This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+ MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> {
+
+ private static final String INPUT_TAG = "input";
+
+ @Override
+ protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) {
+ return new MongoDbWriteSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:mongodb_write:v1";
+ }
+
+ @Override
+ public List inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /** Configuration class for the MongoDB Write transform. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class MongoDbWriteSchemaTransformConfiguration implements Serializable {
+
+ @SchemaFieldDescription("The connection URI for the MongoDB server.")
+ public abstract String getUri();
+
+ @SchemaFieldDescription("The MongoDB database to write to.")
+ public abstract String getDatabase();
+
+ @SchemaFieldDescription("The MongoDB collection to write to.")
+ public abstract String getCollection();
+
+ // @SchemaFieldDescription("The number of documents to include in each batch write.")
+ // @Nullable
+ // public abstract Long getBatchSize();
+ //
+ // @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.")
+ // @Nullable
+ // public abstract Boolean getOrdered();
+
+ public void validate() {
+ checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified.");
+ checkArgument(
+ getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified.");
+ checkArgument(
+ getCollection() != null && !getCollection().isEmpty(),
+ "MongoDB collection must be specified.");
+ }
+
+ public static Builder builder() {
+ return new AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration
+ .Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setUri(String uri);
+
+ public abstract Builder setDatabase(String database);
+
+ public abstract Builder setCollection(String collection);
+
+ public abstract Builder setBatchSize(Long batchSize);
+
+ public abstract Builder setOrdered(Boolean ordered);
+
+ public abstract MongoDbWriteSchemaTransformConfiguration build();
+ }
+ }
+
+ /** The {@link SchemaTransform} that performs the write operation. */
+ private static class MongoDbWriteSchemaTransform extends SchemaTransform {
+ private final MongoDbWriteSchemaTransformConfiguration configuration;
+
+ MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection rows = input.get(INPUT_TAG);
+
+ PCollection documents =
+ rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn()));
+
+ MongoDbIO.Write write =
+ MongoDbIO.write()
+ .withUri(configuration.getUri())
+ .withDatabase(configuration.getDatabase())
+ .withCollection(configuration.getCollection());
+
+ // if (configuration.getBatchSize() != null) {
+ // write = write.withBatchSize(configuration.getBatchSize());
+ // }
+ // if (configuration.getOrdered() != null) {
+ // write = write.withOrdered(configuration.getOrdered());
+ // }
+
+ documents.apply("WriteToMongo", write);
+
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+ }
+
+ /** A {@link DoFn} to convert a Beam {@link Row} to a MongoDB {@link Document}. */
+ private static class RowToMongoDocumentFn extends DoFn {
+ @ProcessElement
+ public void processElement(@Element Row row, OutputReceiver out) {
+ Document doc = new Document();
+ for (int i = 0; i < row.getSchema().getFieldCount(); i++) {
+ String fieldName = row.getSchema().getField(i).getName();
+ Object value = row.getValue(i);
+
+ if (value != null) {
+ doc.append(fieldName, value);
+ }
+ }
+ out.output(doc);
+ }
+ }
+ /** Converts a Beam {@link Row} to a BSON {@link Document}. */
+ static class RowToBsonDocumentFn extends DoFn {
+ @ProcessElement
+ public void processElement(@Element Row row, OutputReceiver out) {
+ Document doc = new Document();
+ for (Field field : row.getSchema().getFields()) {
+ doc.append(field.getName(), row.getValue(field.getName()));
+ }
+ out.output(doc);
+ }
+ }
+}
diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py
index 38fa2689268e..62368c126336 100644
--- a/sdks/python/apache_beam/yaml/integration_tests.py
+++ b/sdks/python/apache_beam/yaml/integration_tests.py
@@ -45,6 +45,7 @@
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.google import PubSubContainer
from testcontainers.kafka import KafkaContainer
+from testcontainers.mongodb import MongoDbContainer
from testcontainers.mssql import SqlServerContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.postgres import PostgresContainer
@@ -201,6 +202,60 @@ def temp_bigtable_table(project, prefix='yaml_bt_it_'):
_LOGGER.warning("Failed to clean up instance")
+@contextlib.contextmanager
+def temp_mongodb_table():
+ """
+ provides a temporary MongoDB instance.
+
+ starts a MongoDB container, creates a unique database
+ and collection name for test isolation, and yields them as a dictionary.
+
+ This allows YAML test files to get connection details without hardcoding them.
+ Example usage in a YAML test file's fixture section:
+
+ fixtures:
+ - name: mongo_vars
+ type: path.to.this.file.mongodb_fixture
+
+ Then, in the pipeline definition, you can use placeholders like:
+ - uri: ${mongo_vars.URI}
+ - database: ${mongo_vars.DATABASE}
+ - collection: ${mongo_vars.COLLECTION}
+ """
+ _LOGGER.info("Setting up MongoDB fixture...")
+ # Initialize and start the MongoDB container.
+ # This will pull the 'mongo:7.0.7' image if it's not available locally.
+ mongo_container = MongoDbContainer("mongo:7.0.7")
+ try:
+ mongo_container.start()
+
+ # Get the dynamically generated connection URI.
+ mongo_uri = mongo_container.get_connection_url()
+
+ # Generate a unique database and collection name for this test run to ensure
+ # isolation between different test files.
+ db_name = f'db_{uuid.uuid4().hex}'
+ collection_name = f'collection_{uuid.uuid4().hex}'
+
+ _LOGGER.info(
+ "MongoDB container started. URI: [%s], DB: [%s], Collection: [%s]",
+ mongo_uri,
+ db_name,
+ collection_name)
+
+ yield {
+ 'URI': mongo_uri,
+ 'DATABASE': db_name,
+ 'COLLECTION': collection_name,
+ }
+
+ finally:
+ # This block executes after the test suite finishes.
+ _LOGGER.info("Tearing down MongoDB fixture...")
+ mongo_container.stop()
+ _LOGGER.info("MongoDB container stopped.")
+
+
@contextlib.contextmanager
def temp_sqlite_database(prefix='yaml_jdbc_it_'):
"""Context manager to provide a temporary SQLite database via JDBC for
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml
index 3d619c187076..e1c176c98965 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -397,3 +397,20 @@
'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1'
config:
gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+
+#MongoDB
+- type: renaming
+ transforms:
+ 'WriteToMongoDB': 'WriteToMongoDB'
+ config:
+ mappings:
+ 'WriteToMongoDB':
+ connection_uri: "connection_uri"
+ database: "database"
+ collection: "collection"
+ underlying_provider:
+ type: beamJar
+ transforms:
+ 'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1'
+ config:
+ gradle_target: ':sdks:java:io:mongodb:shadowJar'
diff --git a/sdks/python/apache_beam/yaml/tests/mongodb.yaml b/sdks/python/apache_beam/yaml/tests/mongodb.yaml
new file mode 100644
index 000000000000..e69de29bb2d1