diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml index ae52cbb68e17..99a11bbab11b 100644 --- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml +++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml @@ -96,6 +96,9 @@ jobs: ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Stream.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_One_Hot_Encoding_Batch.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_GPU_Batch.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Text_Embedding_Batch.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_CPU_Batch.txt # The env variables are created and populated in the test-arguments-action as "_test_arguments_" - name: get current time run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV @@ -238,4 +241,37 @@ jobs: -PpythonVersion=3.10 \ -PbeamPythonExtra=ml_test \ -PloadTest.requirementsTxtFile=apache_beam/ml/transforms/mltransform_tests_requirements.txt \ - '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_12 }} --autoscaling_algorithm=NONE --metrics_table=mltransform_one_hot_encoding_batch --influx_measurement=mltransform_one_hot_encoding_batch --job_name=benchmark-tests-mltransform-one-hot-encoding-batch-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/mltransform/one_hot_output_${{env.NOW_UTC}} --artifact_location=gs://temp-storage-for-end-to-end-tests/mltransform/artifacts_${{env.NOW_UTC}}' \ No newline at end of file + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_12 }} --autoscaling_algorithm=NONE --metrics_table=mltransform_one_hot_encoding_batch --influx_measurement=mltransform_one_hot_encoding_batch --job_name=benchmark-tests-mltransform-one-hot-encoding-batch-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/mltransform/one_hot_output_${{env.NOW_UTC}} --artifact_location=gs://temp-storage-for-end-to-end-tests/mltransform/artifacts_${{env.NOW_UTC}}' + - name: run MLTransform Image Embedding GPU Batch + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.mltransform_image_embedding_benchmark \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_13 }} --artifact_location=gs://temp-storage-for-perf-tests/loadtests/mltransform_image_embedding_gpu/artifacts-${{env.NOW_UTC}} --job_name=benchmark-tests-mltransform-image-embedding-gpu-batch-${{env.NOW_UTC}}' + - name: run MLTransform Text Embedding Batch + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.mltransform_text_embedding_benchmark \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_14 }} --output=gs://temp-storage-for-perf-tests/loadtests/mltransform_text_embedding/results-${{env.NOW_UTC}} --artifact_location=gs://temp-storage-for-perf-tests/loadtests/mltransform_text_embedding/artifacts-${{env.NOW_UTC}} --job_name=benchmark-tests-mltransform-text-embedding-batch-${{env.NOW_UTC}}' + - name: run MLTransform Image Embedding CPU Batch + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.mltransform_image_embedding_benchmark \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_15 }} --artifact_location=gs://temp-storage-for-perf-tests/loadtests/mltransform_image_embedding_cpu/artifacts-${{env.NOW_UTC}} --job_name=benchmark-tests-mltransform-image-embedding-cpu-batch-${{env.NOW_UTC}}' diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_CPU_Batch.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_CPU_Batch.txt new file mode 100644 index 000000000000..37c47ca25542 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_CPU_Batch.txt @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--runner=DataflowRunner +--project=apache-beam-testing +--region=us-central1 +--num_workers=5 +--max_num_workers=20 +--disk_size_gb=100 +--autoscaling_algorithm=THROUGHPUT_BASED +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--requirements_file=apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--metrics_table=mltransform_image_embedding_cpu_batch +--input_options={} +--influx_measurement=mltransform_image_embedding_cpu_batch +--mode=batch +--input=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt +--artifact_location=gs://temp-storage-for-perf-tests/loadtests/mltransform_image_embedding_cpu/artifacts +--output_table=apache-beam-testing:beam_run_inference.result_mltransform_image_embedding_cpu_batch +--pretrained_model_name=clip-ViT-B-32 +--device=CPU +--min_batch_size=8 +--max_batch_size=64 +--embedding_min_ram=16GB +--dataflow_service_options=enable_prime +--experiments=use_runner_v2 +--timeout_ms=3600000 diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_GPU_Batch.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_GPU_Batch.txt new file mode 100644 index 000000000000..274dc43f9416 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Image_Embedding_GPU_Batch.txt @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--runner=DataflowRunner +--project=apache-beam-testing +--region=us-central1 +--num_workers=5 +--max_num_workers=20 +--disk_size_gb=100 +--autoscaling_algorithm=THROUGHPUT_BASED +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--requirements_file=apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--metrics_table=mltransform_image_embedding_gpu_batch +--input_options={} +--influx_measurement=mltransform_image_embedding_gpu_batch +--mode=batch +--input=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt +--artifact_location=gs://temp-storage-for-perf-tests/loadtests/mltransform_image_embedding_gpu/artifacts +--output_table=apache-beam-testing:beam_run_inference.result_mltransform_image_embedding_gpu_batch +--pretrained_model_name=clip-ViT-B-32 +--device=GPU +--min_batch_size=8 +--max_batch_size=64 +--embedding_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver +--sdk_location=container +--sdk_container_image=us.gcr.io/apache-beam-testing/python-postcommit-it/tensor_rt:latest +--dataflow_service_options=enable_prime +--experiments=use_runner_v2 +--timeout_ms=3600000 diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Text_Embedding_Batch.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Text_Embedding_Batch.txt new file mode 100644 index 000000000000..1835dc260025 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Text_Embedding_Batch.txt @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--runner=DataflowRunner +--project=apache-beam-testing +--region=us-central1 +--machine_type=n1-standard-4 +--num_workers=10 +--max_num_workers=50 +--disk_size_gb=100 +--autoscaling_algorithm=THROUGHPUT_BASED +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--requirements_file=apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--metrics_table=mltransform_text_embedding_batch +--input_options={} +--influx_measurement=mltransform_text_embedding_batch +--input_file=gs://apache-beam-ml/testing/inputs/sentences_50k.txt +--output=gs://temp-storage-for-perf-tests/loadtests/mltransform_text_embedding/results +--artifact_location=gs://temp-storage-for-perf-tests/loadtests/mltransform_text_embedding/artifacts +--model_name=sentence-transformers/all-MiniLM-L6-v2 +--min_batch_size=16 +--max_batch_size=128 +--model_batch_size=32 +--device=CPU +--sdk_location=container +--sdk_container_image=us.gcr.io/apache-beam-testing/python-postcommit-it/tensor_rt:latest +--experiments=use_runner_v2 diff --git a/.test-infra/tools/refresh_looker_metrics.py b/.test-infra/tools/refresh_looker_metrics.py index c8d66f4a4bd3..69798e6f24e8 100644 --- a/.test-infra/tools/refresh_looker_metrics.py +++ b/.test-infra/tools/refresh_looker_metrics.py @@ -46,7 +46,10 @@ ("96", ["270", "304", "305", "353", "354"]), # Table Row Inference Sklearn Batch ("106", ["355", "356", "357", "358", "359"]), # Table Row Inference Sklearn Streaming ("107", ["360", "361", "362", "363", "364"]), # MLTransform Generate Vocab Batch - ("108", ["365", "366", "367", "368", "369"]) # MLTransform One-Hot Encoding Batch + ("108", ["365", "366", "367", "368", "369"]), # MLTransform One-Hot Encoding Batch + ("109", ["375", "376", "377", "378", "379"]), # MLTransform Text Embedding Batch + ("110", ["380", "381", "382", "383", "385"]), # MLTransform Image Embedding GPU Batch + ("111", ["370", "371", "372", "373", "374"]), # MLTransform Image Embedding CPU Batch ] def get_look(id: str) -> models.Look: diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding.py new file mode 100644 index 000000000000..c0c7b180a175 --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding.py @@ -0,0 +1,263 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Batch image embedding pipeline using MLTransform. + +The pipeline reads image URIs from a text file, decodes images with Pillow, +generates SentenceTransformers image embeddings through MLTransform, and writes +results to BigQuery using batch file loads. +""" + +import argparse +import hashlib +import io +import logging +import time +from collections.abc import Iterable +from typing import Any + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.embeddings.huggingface import SentenceTransformerEmbeddings +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult +from PIL import Image + +IMAGE_COLUMN = 'image' +IMAGE_ID_COLUMN = 'image_id' +IMAGE_URI_COLUMN = 'image_uri' + +DEFAULT_IMAGE_MODEL_NAME = 'clip-ViT-B-32' +DEFAULT_ACCELERATOR = 'type:nvidia-tesla-t4;count:1;install-nvidia-driver' +DEFAULT_EMBEDDING_MIN_RAM = '16GB' + +OUTPUT_TABLE_SCHEMA = { + 'fields': [ + { + 'name': 'image_id', 'type': 'STRING' + }, + { + 'name': 'image_uri', 'type': 'STRING' + }, + { + 'name': 'model_name', 'type': 'STRING' + }, + { + 'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED' + }, + { + 'name': 'embedding_dim', 'type': 'INT64' + }, + { + 'name': 'infer_ms', 'type': 'INT64' + }, + ] +} + + +def now_millis() -> int: + return int(time.time() * 1000) + + +def sha1_hex(value: str) -> str: + return hashlib.sha1(value.encode('utf-8')).hexdigest() + + +def filter_empty_uri(uri: str) -> Iterable[str]: + uri = uri.strip() + if uri: + yield uri + + +def load_image_from_uri(uri: str) -> bytes: + with FileSystems.open(uri) as file: + return file.read() + + +def decode_pil(image_bytes: bytes) -> Image.Image: + with Image.open(io.BytesIO(image_bytes)) as image: + image = image.convert('RGB') + image.load() + return image + + +class ReadImage(beam.DoFn): + def process(self, uri: str) -> Iterable[dict[str, Any]]: + image_id = sha1_hex(uri) + try: + yield { + IMAGE_ID_COLUMN: image_id, + IMAGE_URI_COLUMN: uri, + IMAGE_COLUMN: decode_pil(load_image_from_uri(uri)), + } + except Exception as exc: + logging.warning( + 'Failed to read or decode image %s (%s): %s', image_id, uri, exc) + + +def _as_dict(row: Any) -> dict[str, Any]: + if hasattr(row, 'as_dict'): + return row.as_dict() + if hasattr(row, '_asdict'): + return row._asdict() + return dict(row) + + +def embedding_to_list(value: Any) -> list[float]: + if hasattr(value, 'tolist'): + return value.tolist() + return [float(item) for item in value] + + +class FormatImageEmbeddingOutput(beam.DoFn): + def __init__(self, model_name: str): + self.model_name = model_name + + def process(self, row: Any) -> Iterable[dict[str, Any]]: + row = _as_dict(row) + embedding = embedding_to_list(row[IMAGE_COLUMN]) + yield { + IMAGE_ID_COLUMN: row[IMAGE_ID_COLUMN], + IMAGE_URI_COLUMN: row[IMAGE_URI_COLUMN], + 'model_name': self.model_name, + 'embedding': embedding, + 'embedding_dim': len(embedding), + 'infer_ms': now_millis(), + } + + +def _str_to_bool(value: str) -> bool: + if value.lower() == 'true': + return True + if value.lower() == 'false': + return False + raise argparse.ArgumentTypeError( + f'"true" or "false" expected, got "{value}" instead.') + + +def parse_known_args(argv): + parser = argparse.ArgumentParser() + parser.add_argument('--mode', default='batch', choices=['batch']) + parser.add_argument( + '--input', required=True, help='Path to a text file with image URIs.') + parser.add_argument( + '--output_table', required=True, help='BigQuery table for embeddings.') + parser.add_argument( + '--publish_to_big_query', + type=_str_to_bool, + default=True, + help='Whether to write embedding rows to BigQuery.') + parser.add_argument( + '--artifact_location', + required=True, + help='Path where MLTransform artifacts are written.') + parser.add_argument( + '--pretrained_model_name', + default=DEFAULT_IMAGE_MODEL_NAME, + help='SentenceTransformers image model name.') + parser.add_argument( + '--device', + default='CPU', + choices=['CPU', 'GPU'], + help='Device used by SentenceTransformers on the worker.') + parser.add_argument( + '--min_batch_size', + type=int, + default=8, + help='Minimum Beam inference batch size.') + parser.add_argument( + '--max_batch_size', + type=int, + default=64, + help='Maximum Beam inference batch size.') + parser.add_argument( + '--embedding_accelerator', + default=DEFAULT_ACCELERATOR, + help='GPU accelerator resource hint for the MLTransform embedding step.') + parser.add_argument( + '--embedding_min_ram', + default=DEFAULT_EMBEDDING_MIN_RAM, + help='CPU right-fitting min RAM resource hint for the embedding step.') + return parser.parse_known_args(argv) + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + known_args, pipeline_args = parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + pipeline_options.view_as(StandardOptions).streaming = False + + device = 'cuda' if known_args.device == 'GPU' else 'cpu' + embedding_transform = SentenceTransformerEmbeddings( + model_name=known_args.pretrained_model_name, + columns=[IMAGE_COLUMN], + image_model=True, + min_batch_size=known_args.min_batch_size, + max_batch_size=known_args.max_batch_size, + load_model_args={'device': device}, + inference_args={ + 'convert_to_numpy': True, + 'show_progress_bar': False, + }) + + ml_transform = MLTransform( + write_artifact_location=known_args.artifact_location).with_transform( + embedding_transform) + if known_args.device == 'GPU' and known_args.embedding_accelerator: + ml_transform = ml_transform.with_resource_hints( + accelerator=known_args.embedding_accelerator) + elif known_args.embedding_min_ram: + ml_transform = ml_transform.with_resource_hints( + min_ram=known_args.embedding_min_ram) + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + rows = ( + pipeline + | 'ReadImageURIs' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmptyURIs' >> beam.FlatMap(filter_empty_uri) + | 'ReshuffleBeforeEmbedding' >> beam.Reshuffle() + | 'ReadImages' >> beam.ParDo(ReadImage())) + results = ( + rows + | 'MLTransformImageEmbeddings' >> ml_transform + | 'FormatOutput' >> beam.ParDo( + FormatImageEmbeddingOutput( + model_name=known_args.pretrained_model_name))) + + if known_args.publish_to_big_query: + _ = ( + results + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + known_args.output_table, + schema=OUTPUT_TABLE_SCHEMA, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) + + result = pipeline.run() + if not test_pipeline: + result.wait_until_finish() + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding_test.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding_test.py new file mode 100644 index 000000000000..2dd8a46c043e --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding_test.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import io +import unittest + +import apache_beam as beam +from apache_beam.io.gcp import bigquery_tools +from PIL import Image + +from apache_beam.examples.ml_transform import mltransform_image_embedding + + +class FakeArray: + def __init__(self, value): + self.value = value + + def tolist(self): + return self.value + + +class MLTransformImageEmbeddingTest(unittest.TestCase): + def test_filter_empty_uri(self): + self.assertEqual( + list(mltransform_image_embedding.filter_empty_uri(' ')), []) + self.assertEqual( + list(mltransform_image_embedding.filter_empty_uri(' gs://bucket/img ')), + ['gs://bucket/img']) + + def test_sha1_hex(self): + self.assertEqual( + mltransform_image_embedding.sha1_hex('gs://bucket/img.jpg'), + 'f3557ebc47344e0189d39dcad3642c8e8afbc10a') + + def test_decode_pil_returns_rgb_image(self): + image = Image.new('RGBA', (2, 2), color=(255, 0, 0, 255)) + image_bytes = io.BytesIO() + image.save(image_bytes, format='PNG') + + decoded = mltransform_image_embedding.decode_pil(image_bytes.getvalue()) + + self.assertEqual(decoded.mode, 'RGB') + self.assertEqual(decoded.size, (2, 2)) + + def test_embedding_to_list(self): + self.assertEqual( + mltransform_image_embedding.embedding_to_list(FakeArray([1, 2.5])), + [1, 2.5]) + + def test_format_output_with_dict(self): + row = { + 'image_id': 'abc', + 'image_uri': 'gs://bucket/img.jpg', + 'image': FakeArray([0.1, 0.2, 0.3]), + } + + output = next( + mltransform_image_embedding.FormatImageEmbeddingOutput( + 'clip-ViT-B-32').process(row)) + + self.assertEqual(output['image_id'], 'abc') + self.assertEqual(output['image_uri'], 'gs://bucket/img.jpg') + self.assertEqual(output['model_name'], 'clip-ViT-B-32') + self.assertEqual(output['embedding'], [0.1, 0.2, 0.3]) + self.assertEqual(output['embedding_dim'], 3) + self.assertIn('infer_ms', output) + + def test_output_table_schema_marks_embedding_as_repeated(self): + schema = bigquery_tools.get_dict_table_schema( + mltransform_image_embedding.OUTPUT_TABLE_SCHEMA) + embedding_field = next( + field for field in schema['fields'] if field['name'] == 'embedding') + self.assertEqual(embedding_field['type'], 'FLOAT64') + self.assertEqual(embedding_field['mode'], 'REPEATED') + + def test_format_output_with_beam_row(self): + row = beam.Row( + image_id='abc', + image_uri='gs://bucket/img.jpg', + image=FakeArray([0.1, 0.2]), + ) + + output = next( + mltransform_image_embedding.FormatImageEmbeddingOutput( + 'clip-ViT-B-32').process(row)) + + self.assertEqual(output['embedding_dim'], 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_text_embedding.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_text_embedding.py new file mode 100644 index 000000000000..b692ac322e09 --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_text_embedding.py @@ -0,0 +1,186 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Batch text embedding pipeline using MLTransform. + +The pipeline reads text lines, generates sentence-transformer embeddings through +MLTransform, and writes JSONL output to a sharded text sink. +""" + +import argparse +import hashlib +import json +import logging +from collections.abc import Iterable +from typing import Any + +import apache_beam as beam +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.embeddings.huggingface import SentenceTransformerEmbeddings +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.runners.runner import PipelineResult + +ID_COLUMN = 'id' +RAW_TEXT_COLUMN = 'raw_text' +TEXT_COLUMN = 'text' + +DEFAULT_MODEL_NAME = 'sentence-transformers/all-MiniLM-L6-v2' + + +def _str_to_bool(value: str) -> bool: + if value.lower() == 'true': + return True + if value.lower() == 'false': + return False + raise argparse.ArgumentTypeError( + f'"true" or "false" expected, got "{value}" instead.') + + +def parse_known_args(argv): + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', required=True, help='Path to the input text file.') + parser.add_argument( + '--output', + required=True, + help='Output prefix for sharded JSONL embedding results.') + parser.add_argument( + '--artifact_location', + required=True, + help='Path where MLTransform artifacts are written.') + parser.add_argument( + '--model_name', + default=DEFAULT_MODEL_NAME, + help='SentenceTransformers model name.') + parser.add_argument( + '--min_batch_size', + type=int, + default=16, + help='Minimum Beam inference batch size.') + parser.add_argument( + '--max_batch_size', + type=int, + default=128, + help='Maximum Beam inference batch size.') + parser.add_argument( + '--model_batch_size', + type=int, + default=32, + help='Batch size passed to SentenceTransformer.encode.') + parser.add_argument( + '--device', + default='CPU', + choices=['CPU', 'GPU'], + help='Device used by SentenceTransformers on the worker.') + parser.add_argument( + '--large_model', + type=_str_to_bool, + default=False, + help='Whether RunInference should share the model across processes.') + return parser.parse_known_args(argv) + + +def text_to_record(line: str) -> Iterable[dict[str, str]]: + text = line.strip() + if not text: + return + + yield { + ID_COLUMN: hashlib.sha256(text.encode('utf-8')).hexdigest(), + RAW_TEXT_COLUMN: text, + TEXT_COLUMN: text, + } + + +def _as_dict(row: Any) -> dict[str, Any]: + if hasattr(row, 'as_dict'): + return row.as_dict() + if hasattr(row, '_asdict'): + return row._asdict() + return dict(row) + + +def embedding_to_list(value: Any) -> list[float]: + if hasattr(value, 'tolist'): + return value.tolist() + return [float(item) for item in value] + + +class FormatEmbeddingOutput(beam.DoFn): + def __init__(self, model_name: str): + self.model_name = model_name + + def process(self, row: Any) -> Iterable[str]: + row = _as_dict(row) + embedding = embedding_to_list(row[TEXT_COLUMN]) + yield json.dumps({ + ID_COLUMN: row[ID_COLUMN], + 'model_name': self.model_name, + RAW_TEXT_COLUMN: row[RAW_TEXT_COLUMN], + 'embedding': embedding, + 'embedding_dim': len(embedding) if isinstance(embedding, list) else 0, + }, + sort_keys=True) + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + known_args, pipeline_args = parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + device = 'cuda' if known_args.device == 'GPU' else 'cpu' + embedding_transform = SentenceTransformerEmbeddings( + model_name=known_args.model_name, + columns=[TEXT_COLUMN], + min_batch_size=known_args.min_batch_size, + max_batch_size=known_args.max_batch_size, + large_model=known_args.large_model, + load_model_args={'device': device}, + inference_args={ + 'batch_size': known_args.model_batch_size, + 'convert_to_numpy': True, + 'show_progress_bar': False, + }) + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + records = ( + pipeline + | 'ReadTextLines' >> beam.io.ReadFromText(known_args.input) + | 'ToEmbeddingRecords' >> beam.FlatMap(text_to_record)) + embedded = ( + records + | 'MLTransformTextEmbeddings' >> MLTransform( + write_artifact_location=known_args.artifact_location).with_transform( + embedding_transform)) + _ = ( + embedded + | 'FormatOutput' >> beam.ParDo( + FormatEmbeddingOutput(model_name=known_args.model_name)) + | 'WriteOutput' >> beam.io.WriteToText( + known_args.output, file_name_suffix='.jsonl')) + + result = pipeline.run() + if not test_pipeline: + result.wait_until_finish() + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_text_embedding_test.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_text_embedding_test.py new file mode 100644 index 000000000000..dc22cc42b680 --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_text_embedding_test.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import hashlib +import json +import unittest + +import apache_beam as beam + +from apache_beam.examples.ml_transform import mltransform_text_embedding + + +class FakeArray: + def __init__(self, value): + self.value = value + + def tolist(self): + return self.value + + +class MLTransformTextEmbeddingTest(unittest.TestCase): + def test_text_to_record_skips_empty_lines(self): + self.assertEqual(list(mltransform_text_embedding.text_to_record(' ')), []) + + def test_text_to_record_preserves_text_and_adds_stable_id(self): + records = list( + mltransform_text_embedding.text_to_record(' Apache Beam is fun ')) + + self.assertEqual(len(records), 1) + expected_id = hashlib.sha256(b'Apache Beam is fun').hexdigest() + self.assertEqual( + records[0], + { + 'id': expected_id, + 'raw_text': 'Apache Beam is fun', + 'text': 'Apache Beam is fun', + }) + + def test_embedding_to_list(self): + self.assertEqual( + mltransform_text_embedding.embedding_to_list(FakeArray([1, 2.5])), + [1, 2.5]) + + def test_format_embedding_output_with_dict(self): + row = { + 'id': 'abc', + 'raw_text': 'hello', + 'text': FakeArray([0.1, 0.2, 0.3]), + } + + output = next( + mltransform_text_embedding.FormatEmbeddingOutput('test-model').process( + row)) + + self.assertEqual( + json.loads(output), + { + 'id': 'abc', + 'raw_text': 'hello', + 'model_name': 'test-model', + 'embedding': [0.1, 0.2, 0.3], + 'embedding_dim': 3, + }) + + def test_format_embedding_output_with_beam_row(self): + row = beam.Row( + id='abc', + raw_text='hello', + text=FakeArray([0.1, 0.2]), + ) + + output = next( + mltransform_text_embedding.FormatEmbeddingOutput('test-model').process( + row)) + + self.assertEqual(json.loads(output)['embedding_dim'], 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt b/sdks/python/apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt new file mode 100644 index 000000000000..77019383dd77 --- /dev/null +++ b/sdks/python/apache_beam/ml/transforms/mltransform_embedding_tests_requirements.txt @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +sentence-transformers~=5.0.0 +Pillow>=9.0.0 +google-cloud-monitoring>=2.27.0 diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_image_embedding_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_image_embedding_benchmark.py new file mode 100644 index 000000000000..3a35379671f7 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_image_embedding_benchmark.py @@ -0,0 +1,227 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from typing import Optional + +from google.cloud import monitoring_v3 +from google.protobuf.duration_pb2 import Duration + +from apache_beam.examples.ml_transform import mltransform_image_embedding +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import WorkerOptions +from apache_beam.testing.load_tests import dataflow_cost_consts as costs +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark +from apache_beam.testing.load_tests.load_test import LoadTestOptions + + +class MLTransformImageEmbeddingOptions( + LoadTestOptions, + StandardOptions, + GoogleCloudOptions, + WorkerOptions, + DebugOptions, + SetupOptions, +): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--mode', default='batch') + parser.add_argument('--input', default='') + parser.add_argument('--input_file', default='') + parser.add_argument('--output_table', default='') + parser.add_argument('--artifact_location', default='') + parser.add_argument( + '--pretrained_model_name', + default=mltransform_image_embedding.DEFAULT_IMAGE_MODEL_NAME) + parser.add_argument('--device', default='CPU') + parser.add_argument('--min_batch_size', type=int, default=8) + parser.add_argument('--max_batch_size', type=int, default=64) + parser.add_argument( + '--embedding_accelerator', + default=mltransform_image_embedding.DEFAULT_ACCELERATOR) + parser.add_argument( + '--embedding_min_ram', + default=mltransform_image_embedding.DEFAULT_EMBEDDING_MIN_RAM) + + +class MLTransformImageEmbeddingBenchmarkTest(DataflowCostBenchmark): + options_class = MLTransformImageEmbeddingOptions + + def __init__(self): + self.metrics_namespace = 'BeamML_MLTransform' + super().__init__( + metrics_namespace=self.metrics_namespace, + pcollection='FormatOutput.out0') + self.opts = self.pipeline.get_pipeline_options().view_as( + MLTransformImageEmbeddingOptions) + if self.opts.device == 'GPU': + self.gpu = costs.Accelerator.T4 + + def _get_worker_time_interval( + self, job_id: str) -> tuple[Optional[str], Optional[str]]: + """GPU jobs use separate CPU and GPU worker pools. + + The shared base class stops paging logs after the first start/stop pair, + which can end the monitoring window before the GPU pool finishes. Scan all + pages and use the earliest start and latest stop for GPU runs only. + """ + if self.opts.device != 'GPU': + return super()._get_worker_time_interval(job_id) + + start_time, end_time = None, None + page_token = None + message_count = 0 + last_message_time = None + while True: + messages, page_token = self.dataflow_client.list_messages( + job_id=job_id, + start_time=None, + end_time=None, + page_token=page_token, + minimum_importance='JOB_MESSAGE_DEBUG') + for message in messages: + message_count += 1 + text = getattr(message, 'messageText', None) or getattr( + message, 'message_text', None) + if getattr(message, 'time', None): + last_message_time = message.time + if not text: + continue + if self.WORKER_START_PATTERN.search(text): + if start_time is None or message.time < start_time: + start_time = message.time + logging.info('Matched WORKER_START_PATTERN: %r', text) + if self.WORKER_STOP_PATTERN.search(text): + if end_time is None or message.time > end_time: + end_time = message.time + logging.info('Matched WORKER_STOP_PATTERN: %r', text) + if not page_token: + break + if start_time and not end_time and last_message_time: + end_time = last_message_time + logging.info( + 'Using last job message time as end_time for GPU job: %s', end_time) + if not start_time or not end_time: + logging.warning( + 'Could not determine GPU worker time interval. ' + 'start_time=%s, end_time=%s, total messages=%d', + start_time, + end_time, + message_count) + return start_time, end_time + + def _get_throughput_metrics( + self, + project: str, + job_id: str, + start_time: str, + end_time: str, + pcollection_name: str | None = None) -> dict[str, float]: + pcollection_candidates = [ + pcollection_name or self.pcollection, + 'MLTransformImageEmbeddings.out0', + 'MLTransformImageEmbeddings/RunInference.out0', + 'MLTransformImageEmbeddings/RunInference/' + 'BeamML_RunInference_Postprocess-0.out0', + 'WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobs.out0', + ] + seen = set() + for candidate in pcollection_candidates: + if not candidate or candidate in seen: + continue + seen.add(candidate) + metrics = super()._get_throughput_metrics( + project, job_id, start_time, end_time, candidate) + if (metrics.get('AvgThroughputBytes', 0) > 0 or + metrics.get('AvgThroughputElements', 0) > 0): + logging.info('Using throughput metrics for PCollection %s', candidate) + return metrics + + logging.warning( + 'No PCollection-level throughput metrics found for candidates %s. ' + 'Falling back to job-level Dataflow throughput metrics.', + pcollection_candidates) + return self._get_job_level_throughput_metrics( + project, job_id, start_time, end_time) + + def _get_job_level_throughput_metrics( + self, project: str, job_id: str, start_time: str, + end_time: str) -> dict[str, float]: + interval = monitoring_v3.TimeInterval( + start_time=start_time, end_time=end_time) + aggregation = monitoring_v3.Aggregation( + alignment_period=Duration(seconds=60), + per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN) + requests = { + 'Bytes': monitoring_v3.ListTimeSeriesRequest( + name=f'projects/{project}', + filter=f'metric.type=' + f'"dataflow.googleapis.com/job/estimated_byte_count" ' + f'AND metric.labels.job_id="{job_id}"', + interval=interval, + aggregation=aggregation), + 'Elements': monitoring_v3.ListTimeSeriesRequest( + name=f'projects/{project}', + filter=f'metric.type="dataflow.googleapis.com/job/element_count" ' + f'AND metric.labels.job_id="{job_id}"', + interval=interval, + aggregation=aggregation), + } + metrics = {} + for key, request in requests.items(): + values = [ + point.value.double_value + for series in self.monitoring_client.list_time_series( + request=request) for point in series.points + ] + metrics[f'AvgThroughput{key}'] = sum(values) / len( + values) if values else 0.0 + return metrics + + def test(self): + input_path = self.opts.input or self.opts.input_file + if not input_path: + raise RuntimeError('Please provide --input or --input_file.') + if not self.opts.output_table: + raise RuntimeError('Please provide --output_table.') + if not self.opts.artifact_location: + raise RuntimeError('Please provide --artifact_location.') + + extra_opts = { + 'mode': self.opts.mode, + 'input': input_path, + 'output_table': self.opts.output_table, + 'artifact_location': self.opts.artifact_location, + 'pretrained_model_name': self.opts.pretrained_model_name, + 'device': self.opts.device, + 'min_batch_size': self.opts.min_batch_size, + 'max_batch_size': self.opts.max_batch_size, + 'embedding_accelerator': self.opts.embedding_accelerator, + 'embedding_min_ram': self.opts.embedding_min_ram, + } + + self.result = mltransform_image_embedding.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + MLTransformImageEmbeddingBenchmarkTest().run() diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_text_embedding_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_text_embedding_benchmark.py new file mode 100644 index 000000000000..e08d0f696c85 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_text_embedding_benchmark.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging + +from google.cloud import monitoring_v3 +from google.protobuf.duration_pb2 import Duration + +from apache_beam.examples.ml_transform import mltransform_text_embedding +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import WorkerOptions +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark +from apache_beam.testing.load_tests.load_test import LoadTestOptions + + +class MLTransformTextEmbeddingOptions( + LoadTestOptions, + StandardOptions, + GoogleCloudOptions, + WorkerOptions, + DebugOptions, + SetupOptions, +): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--input', default='') + parser.add_argument('--input_file', default='') + parser.add_argument('--output', default='') + parser.add_argument('--artifact_location', default='') + parser.add_argument( + '--model_name', default=mltransform_text_embedding.DEFAULT_MODEL_NAME) + parser.add_argument('--min_batch_size', type=int, default=16) + parser.add_argument('--max_batch_size', type=int, default=128) + parser.add_argument('--model_batch_size', type=int, default=32) + parser.add_argument('--device', default='CPU') + parser.add_argument('--large_model', default='false') + + +class MLTransformTextEmbeddingBenchmarkTest(DataflowCostBenchmark): + options_class = MLTransformTextEmbeddingOptions + + def __init__(self): + self.metrics_namespace = 'BeamML_MLTransform' + super().__init__( + metrics_namespace=self.metrics_namespace, + pcollection='FormatOutput.out0') + self.opts = self.pipeline.get_pipeline_options().view_as( + MLTransformTextEmbeddingOptions) + + def _get_throughput_metrics( + self, + project: str, + job_id: str, + start_time: str, + end_time: str, + pcollection_name: str | None = None) -> dict[str, float]: + pcollection_candidates = [ + pcollection_name or self.pcollection, + 'MLTransformTextEmbeddings.out0', + 'MLTransformTextEmbeddings/RunInference.out0', + 'MLTransformTextEmbeddings/RunInference/' + 'BeamML_RunInference_Postprocess-0.out0', + 'WriteOutput/Write/WriteImpl/FinalizeWrite.out0', + ] + seen = set() + for candidate in pcollection_candidates: + if not candidate or candidate in seen: + continue + seen.add(candidate) + metrics = super()._get_throughput_metrics( + project, job_id, start_time, end_time, candidate) + if (metrics.get('AvgThroughputBytes', 0) > 0 or + metrics.get('AvgThroughputElements', 0) > 0): + logging.info('Using throughput metrics for PCollection %s', candidate) + return metrics + + logging.warning( + 'No PCollection-level throughput metrics found for candidates %s. ' + 'Falling back to job-level Dataflow throughput metrics.', + pcollection_candidates) + return self._get_job_level_throughput_metrics( + project, job_id, start_time, end_time) + + def _get_job_level_throughput_metrics( + self, project: str, job_id: str, start_time: str, + end_time: str) -> dict[str, float]: + interval = monitoring_v3.TimeInterval( + start_time=start_time, end_time=end_time) + aggregation = monitoring_v3.Aggregation( + alignment_period=Duration(seconds=60), + per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN) + requests = { + 'Bytes': monitoring_v3.ListTimeSeriesRequest( + name=f'projects/{project}', + filter=f'metric.type=' + f'"dataflow.googleapis.com/job/estimated_byte_count" ' + f'AND metric.labels.job_id="{job_id}"', + interval=interval, + aggregation=aggregation), + 'Elements': monitoring_v3.ListTimeSeriesRequest( + name=f'projects/{project}', + filter=f'metric.type="dataflow.googleapis.com/job/element_count" ' + f'AND metric.labels.job_id="{job_id}"', + interval=interval, + aggregation=aggregation), + } + metrics = {} + for key, request in requests.items(): + values = [ + point.value.double_value + for series in self.monitoring_client.list_time_series( + request=request) for point in series.points + ] + metrics[f'AvgThroughput{key}'] = sum(values) / len( + values) if values else 0.0 + return metrics + + def test(self): + input_path = self.opts.input or self.opts.input_file + if not input_path: + raise RuntimeError('Please provide --input or --input_file.') + if not self.opts.output: + raise RuntimeError('Please provide --output.') + if not self.opts.artifact_location: + raise RuntimeError('Please provide --artifact_location.') + + extra_opts = { + 'input': input_path, + 'output': self.opts.output, + 'artifact_location': self.opts.artifact_location, + 'model_name': self.opts.model_name, + 'min_batch_size': self.opts.min_batch_size, + 'max_batch_size': self.opts.max_batch_size, + 'model_batch_size': self.opts.model_batch_size, + 'device': self.opts.device, + 'large_model': self.opts.large_model, + } + + self.result = mltransform_text_embedding.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + MLTransformTextEmbeddingBenchmarkTest().run() diff --git a/website/www/site/content/en/performance/_index.md b/website/www/site/content/en/performance/_index.md index a0eaba2aa0ec..347b7c1110c3 100644 --- a/website/www/site/content/en/performance/_index.md +++ b/website/www/site/content/en/performance/_index.md @@ -61,3 +61,6 @@ See the following pages for performance measures recorded when running various B - [Table Row Inference Sklearn Batch](/performance/tablerowinference) - [MLTransform Generate Vocab (batch)](/performance/mltransformvocab) - [MLTransform One-Hot Encoding](/performance/mltransformonehot) +- [MLTransform Text Embedding](/performance/mltransform-text-embedding) +- [MLTransform Image Embedding GPU](/performance/mltransform-image-embedding-gpu) +- [MLTransform Image Embedding CPU](/performance/mltransform-image-embedding-cpu) diff --git a/website/www/site/content/en/performance/mltransform-image-embedding-cpu/_index.md b/website/www/site/content/en/performance/mltransform-image-embedding-cpu/_index.md new file mode 100644 index 000000000000..6780093b244a --- /dev/null +++ b/website/www/site/content/en/performance/mltransform-image-embedding-cpu/_index.md @@ -0,0 +1,48 @@ +--- +title: "MLTransform Image Embedding CPU Performance" +--- + + + +# MLTransform Image Embedding CPU Performance + +**Model**: Sentence Transformers — clip-ViT-B-32 (image) +**Accelerator**: CPU with Dataflow Prime right-fitting (16 GB min RAM) +**Host**: Dataflow Prime with throughput-based autoscaling + +This batch pipeline reads image URIs from GCS, decodes images with Pillow, +generates image embeddings through `MLTransform` with +`SentenceTransformerEmbeddings(image_model=True)`, and writes results to +BigQuery using batch file loads. + +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available +[here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="mltransform-image-embedding-cpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="mltransform-image-embedding-cpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="mltransform-image-embedding-cpu" read_or_write="write" section="date" >}} + +See also [MLTransform Image Embedding GPU](/performance/mltransform-image-embedding-gpu) +for the Tesla T4 GPU variant of this pipeline. diff --git a/website/www/site/content/en/performance/mltransform-image-embedding-gpu/_index.md b/website/www/site/content/en/performance/mltransform-image-embedding-gpu/_index.md new file mode 100644 index 000000000000..f5f5f1fee50c --- /dev/null +++ b/website/www/site/content/en/performance/mltransform-image-embedding-gpu/_index.md @@ -0,0 +1,48 @@ +--- +title: "MLTransform Image Embedding GPU Performance" +--- + + + +# MLTransform Image Embedding GPU Performance + +**Model**: Sentence Transformers — clip-ViT-B-32 (image) +**Accelerator**: NVIDIA Tesla T4 GPU +**Host**: Dataflow Prime with throughput-based autoscaling + +This batch pipeline reads image URIs from GCS, decodes images with Pillow, +generates image embeddings through `MLTransform` with +`SentenceTransformerEmbeddings(image_model=True)`, and writes results to +BigQuery using batch file loads. + +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available +[here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml_transform/mltransform_image_embedding.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="mltransform-image-embedding-gpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="mltransform-image-embedding-gpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="mltransform-image-embedding-gpu" read_or_write="write" section="date" >}} + +See also [MLTransform Image Embedding CPU](/performance/mltransform-image-embedding-cpu) +for the CPU right-fitting variant of this pipeline. diff --git a/website/www/site/content/en/performance/mltransform-text-embedding/_index.md b/website/www/site/content/en/performance/mltransform-text-embedding/_index.md new file mode 100644 index 000000000000..051fb88b6c08 --- /dev/null +++ b/website/www/site/content/en/performance/mltransform-text-embedding/_index.md @@ -0,0 +1,44 @@ +--- +title: "MLTransform Text Embedding Performance" +--- + + + +# MLTransform Text Embedding Performance + +**Model**: Sentence Transformers — all-MiniLM-L6-v2 +**Accelerator**: CPU only +**Host**: 10 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This batch pipeline reads text lines from GCS, generates sentence embeddings +through `MLTransform` with `SentenceTransformerEmbeddings`, and writes JSONL +results to GCS. + +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available +[here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml_transform/mltransform_text_embedding.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="mltransform-text-embedding" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="mltransform-text-embedding" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="mltransform-text-embedding" read_or_write="write" section="date" >}} diff --git a/website/www/site/data/performance.yaml b/website/www/site/data/performance.yaml index 42fb5bee92f5..c27dcc22f237 100644 --- a/website/www/site/data/performance.yaml +++ b/website/www/site/data/performance.yaml @@ -315,4 +315,51 @@ looks: title: AvgThroughputBytesPerSec by Version - id: Cn5FsXkdy2ZXCxCJshSCxcjsTW3TXf3c title: AvgThroughputElementsPerSec by Version - + mltransform-text-embedding: + write: + folder: 109 + cost: + - id: wm4mr82Q6MGcJZbvQF32MQB6vYMh3pBM + title: RunTime and EstimatedCost + date: + - id: dcCym4PGpqg3mTXsTXS5tfmTcJCVNTBD + title: AvgThroughputBytesPerSec by Date + - id: Q5sKJqB53Jkhy5TKCybfQgXjJnhp7pRQ + title: AvgThroughputElementsPerSec by Date + version: + - id: qdpHqnkzRPb2zqJr8sjCpW84KSQkhDHJ + title: AvgThroughputBytesPerSec by Version + - id: HcWgFVVct3xq2WjCMmY7Q8KcHGms7KDW + title: AvgThroughputElementsPerSec by Version + mltransform-image-embedding-gpu: + write: + folder: 110 + cost: + - id: zQZzvdcy3srprGq5qxqmRRN4GRHpdRDX + title: RunTime and EstimatedCost + date: + - id: X2wdRXcgsFG7sxfSQykhypFDkYtRjN2J + title: AvgThroughputBytesPerSec by Date + - id: zppHCwdN5qrDpyn2WQpJd8YmfQCPw4dk + title: AvgThroughputElementsPerSec by Date + version: + - id: FZXH7JKyFNWmrkgcFQZYxH7wywZgQpYF + title: AvgThroughputBytesPerSec by Version + - id: FQ3QYHgZTxvh3zPp3BcsGnmJx7yxfVzc + title: AvgThroughputElementsPerSec by Version + mltransform-image-embedding-cpu: + write: + folder: 111 + cost: + - id: dxnGs2xD2vNH4Y6HbFMDd5Hc2d5cqVWc + title: RunTime and EstimatedCost + date: + - id: tkwdB46WT8YDkR4Q37jJ2p7y5fGchmTC + title: AvgThroughputBytesPerSec by Date + - id: x3f5pRDvjKxVPy5Y6GjghhqSddYxprn3 + title: AvgThroughputElementsPerSec by Date + version: + - id: ZPHYbRdBMBSvd7RGS8wF8C8d8hysQmbH + title: AvgThroughputBytesPerSec by Version + - id: zjMRJssC6tNrtKPBgs8yPW8jHkbk8wjk + title: AvgThroughputElementsPerSec by Version