From 5ba07bab67b130c59fa08a7f0adefdb01804d5f9 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Thu, 12 Mar 2026 14:48:20 +0200 Subject: [PATCH 1/5] Implemented MLTransform generate vocab Dataflow benchmark --- ...m_Inference_Python_Benchmarks_Dataflow.yml | 12 + ...aflow_MLTransform_Generate_Vocab_Batch.txt | 44 +++ .test-infra/tools/refresh_looker_metrics.py | 3 +- .../examples/ml_transform/README.md | 130 ++++++++ .../mltransform_generate_vocab.py | 293 ++++++++++++++++++ .../mltransform_generate_vocab_test.py | 214 +++++++++++++ .../mltransform_tests_requirements.txt | 30 ++ .../mltransform_generate_vocab_benchmark.py | 58 ++++ .../load_tests/dataflow_cost_benchmark.py | 25 +- .../www/site/content/en/performance/_index.md | 1 + .../en/performance/mltransformvocab/_index.md | 37 +++ website/www/site/data/performance.yaml | 16 + 12 files changed, 861 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt create mode 100644 sdks/python/apache_beam/examples/ml_transform/README.md create mode 100644 sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py create mode 100644 sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py create mode 100644 sdks/python/apache_beam/ml/transforms/mltransform_tests_requirements.txt create mode 100644 sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py create mode 100644 website/www/site/content/en/performance/mltransformvocab/_index.md diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml index bc60b8bde351..173a060ad955 100644 --- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml +++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml @@ -94,6 +94,7 @@ jobs: ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Stream.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt # The env variables are created and populated in the test-arguments-action as "_test_arguments_" - name: get current time run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV @@ -214,3 +215,14 @@ jobs: -PpythonVersion=3.10 \ -PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \ '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_10 }} --autoscaling_algorithm=THROUGHPUT_BASED --max_num_workers=20 --metrics_table=result_table_row_inference_stream --influx_measurement=result_table_row_inference_stream --mode=streaming --input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark --window_size_sec=60 --trigger_interval_sec=30 --timeout_ms=900000 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs --job_name=benchmark-tests-table-row-inference-stream-${{env.NOW_UTC}}' + - name: run MLTransform Generate Vocab Batch + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.mltransform_generate_vocab_benchmark \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/transforms/mltransform_tests_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_11 }} --job_name=benchmark-tests-mltransform-generate-vocab-batch-${{env.NOW_UTC}}' diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt new file mode 100644 index 000000000000..5d91f6ce08b2 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +--project=apache-beam-testing +--region=us-central1 +--runner=DataflowRunner +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--machine_type=n1-standard-4 +--disk_size_gb=100 +--num_workers=8 +--max_num_workers=16 +--autoscaling_algorithm=THROUGHPUT_BASED +--worker_zone=us-central1-b +--sdk_location=container +--requirements_file=apache_beam/ml/transforms/mltransform_tests_requirements.txt +--input_options={} +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--metrics_table=mltransform_generate_vocab_batch +--influx_measurement=mltransform_generate_vocab_batch +--input_file=gs://apache-beam-ml/testing/inputs/sentences_50k.txt +--output_vocab=gs://temp-storage-for-perf-tests/mltransform/vocab_outputs/mltransform_generate_vocab_batch +--columns=text +--vocab_size=50000 +--min_frequency=1 +--lowercase=true +--tokenizer=whitespace +--oov_token= +--input_expand_factor=1 + diff --git a/.test-infra/tools/refresh_looker_metrics.py b/.test-infra/tools/refresh_looker_metrics.py index afd8ffa6f861..35122d5afe0a 100644 --- a/.test-infra/tools/refresh_looker_metrics.py +++ b/.test-infra/tools/refresh_looker_metrics.py @@ -44,7 +44,8 @@ ("85", ["268", "269", "270", "271", "272"]), # PyTorch Sentiment Batch DistilBERT base uncased ("86", ["284", "285", "286", "287", "288"]), # VLLM Batch Gemma ("96", ["270", "304", "305", "353", "354"]), # Table Row Inference Sklearn Batch - ("106", ["355", "356", "357", "358", "359"]) # Table Row Inference Sklearn Streaming + ("106", ["355", "356", "357", "358", "359"]), # Table Row Inference Sklearn Streaming + ("107", ["360", "361", "362", "363", "364"]), # MLTransform Generate Vocab Batch ] def get_look(id: str) -> models.Look: diff --git a/sdks/python/apache_beam/examples/ml_transform/README.md b/sdks/python/apache_beam/examples/ml_transform/README.md new file mode 100644 index 000000000000..8914180ca3a8 --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/README.md @@ -0,0 +1,130 @@ + + +# MLTransform Examples + +This directory contains Apache Beam examples for MLTransform pipelines. + +## MLTransform - Generate Vocab (Batch only) + +`mltransform_generate_vocab.py` builds a vocabulary artifact from batch input +rows using `MLTransform` + `ComputeAndApplyVocabulary`. + +### What it does + +1. Reads input rows from JSONL (`--input_file`) or BigQuery (`--input_table`). +2. Extracts specified columns (`--columns`). +3. Normalizes text (`trim`, optional lowercasing). +4. Tokenizes text (`whitespace` or `regex` tokenizer). +5. Runs `ComputeAndApplyVocabulary` with top-k and min-frequency constraints. +6. Ensures `--oov_token` is included first. +7. Writes the vocabulary as one token per line. + +### Required arguments + +- `--output_vocab` +- `--columns` +- and one of: + - `--input_file` + - `--input_table` + +### Optional arguments + +- `--vocab_size` (default: `50000`) +- `--min_frequency` (default: `1`) +- `--lowercase` (default: `true`) +- `--tokenizer` (`whitespace` or `regex`, default: `whitespace`) +- `--oov_token` (default: ``) +- `--input_expand_factor` (default: `1`, useful for perf/load testing) + +### Local batch example + +```sh +python -m apache_beam.examples.ml_transform.mltransform_generate_vocab \ + --input_file=/tmp/input.jsonl \ + --output_vocab=/tmp/vocab.txt \ + --columns=text,category \ + --vocab_size=5 \ + --min_frequency=1 \ + --lowercase=true \ + --tokenizer=whitespace \ + --oov_token= \ + --input_expand_factor=1 \ + --runner=DirectRunner +``` + +### Input format + +JSONL input with object rows, for example: + +```json +{"id":"1","text":"Beam beam ML pipeline"} +{"id":"2","text":"Beam pipeline dataflow"} +{"id":"3","text":"ML transform beam"} +{"id":"4","text":"vocab vocab vocab test"} +{"id":"5","text":"rare_token_once"} +{"id":"6","text":""} +{"id":"7","text":null} +``` + +The integration tests in `mltransform_generate_vocab_test.py` generate this +sample data programmatically. + +### Output format + +One token per line: + +1. `oov_token` first +2. remaining tokens follow the vocabulary order produced by + `ComputeAndApplyVocabulary`. + +Example output: + +```txt + +beam +ml +``` + +For this sample and config: + +```sh +--columns=text --min_frequency=2 --vocab_size=3 +``` + +the expected output is: + +```txt + +beam +vocab +ml +``` + +### Empty vocabulary behavior + +If all tokens are filtered out by `--min_frequency`, the pipeline writes only +the reserved `--oov_token` and logs a warning. + +### Additional test datasets + +Test data for happy path and null/empty/missing columns is generated inline in +`mltransform_generate_vocab_test.py`. + +### Performance testing pattern + +- Small local files: functional correctness and output-stability tests. +- Large GCS files (or moderate file + `--input_expand_factor`): throughput/cost + benchmarking on Dataflow. + diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py new file mode 100644 index 000000000000..7d246aab9da1 --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py @@ -0,0 +1,293 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Batch-only vocabulary generation pipeline using MLTransform. + +This pipeline creates a vocabulary artifact from one or more input columns. + +Key properties: +- Batch only (no streaming path). +- Vocabulary generation via MLTransform ComputeAndApplyVocabulary. +- Reserved OOV token is always written first. +- Output format: one token per line. +""" + +import argparse +import json +import logging +import re +import tempfile +from typing import Any + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary +from apache_beam.options.pipeline_options import PipelineOptions + +SUPPORTED_TOKENIZERS = ('whitespace', 'regex') +DEFAULT_REGEX_PATTERN = r"[A-Za-z0-9_]+" + + +def parse_bool_flag(value: str) -> bool: + value_lc = value.strip().lower() + if value_lc in ('1', 'true', 't', 'yes', 'y'): + return True + if value_lc in ('0', 'false', 'f', 'no', 'n'): + return False + raise ValueError( + f'Invalid boolean value {value!r}. Expected true/false style value.') + + +def normalize_text(value: Any, lowercase: bool = True) -> str: + if value is None: + return '' + text = str(value).strip() + if lowercase: + text = text.lower() + return text + + +def tokenize_text( + text: str, + tokenizer: str = 'whitespace', + regex_pattern: str = DEFAULT_REGEX_PATTERN) -> list[str]: + if not text: + return [] + if tokenizer == 'whitespace': + return [token for token in text.split() if token] + if tokenizer == 'regex': + return re.findall(regex_pattern, text) + raise ValueError( + f'Unsupported tokenizer {tokenizer!r}. ' + f'Supported tokenizers: {", ".join(SUPPORTED_TOKENIZERS)}') + + +def _parse_json_line(line: str) -> dict[str, Any]: + try: + parsed = json.loads(line) + except json.JSONDecodeError: + # Treat plain-text rows as values for the default "text" column. + return {'text': line} + if not isinstance(parsed, dict): + raise ValueError( + f'Input JSON line must decode to an object, got: {parsed!r}') + return parsed + + +def _extract_column_values(row: dict[str, Any], + columns: list[str]) -> list[Any]: + values = [] + for col in columns: + if col not in row: + continue + val = row[col] + if val is None: + continue + if isinstance(val, list): + values.extend(val) + else: + values.append(val) + return values + + +def _tokenize_row_values( + values: list[Any], + lowercase: bool, + tokenizer: str, +) -> list[str]: + tokens: list[str] = [] + for value in values: + normalized = normalize_text(value, lowercase=lowercase) + tokens.extend(tokenize_text(normalized, tokenizer=tokenizer)) + return tokens + + +def _resolve_vocab_asset_path( + artifact_location: str, vocab_filename: str, column_name: str) -> str: + asset_name = f'{vocab_filename}_{column_name}' + pattern = ( + f'{artifact_location.rstrip("/")}' + f'/*/transform_fn/assets/{asset_name}') + matches = FileSystems.match([pattern])[0].metadata_list + if not matches: + raise ValueError( + f'Could not locate vocabulary artifact {asset_name!r} under ' + f'{artifact_location!r}.') + return matches[0].path + + +def _read_vocab_tokens(vocab_asset_path: str) -> list[str]: + tokens = [] + with FileSystems.open(vocab_asset_path) as f: + for raw_line in f: + token = raw_line.decode('utf-8').rstrip('\n') + if token: + tokens.append(token) + return tokens + + +def _write_vocab_file(output_path: str, tokens: list[str]) -> None: + with FileSystems.create(output_path) as f: + for token in tokens: + f.write((token + '\n').encode('utf-8')) + + +def parse_known_args(argv): + parser = argparse.ArgumentParser( + description='Generate vocabulary from batch input with MLTransform.') + parser.add_argument('--input_file', help='Input JSONL file path.') + parser.add_argument( + '--input_table', + help='Input BigQuery table path in PROJECT:DATASET.TABLE format.') + parser.add_argument('--output_vocab', help='Output vocab file prefix/path.') + parser.add_argument( + '--columns', + help='Comma-separated source columns to include in vocabulary.') + parser.add_argument( + '--vocab_size', + type=int, + default=50000, + help='Maximum vocabulary size (top-K by frequency).') + parser.add_argument( + '--min_frequency', + type=int, + default=1, + help='Minimum token frequency required to keep token.') + parser.add_argument( + '--lowercase', + default='true', + help='Whether to lowercase text before tokenization (default: true).') + parser.add_argument( + '--tokenizer', + default='whitespace', + help='Tokenizer strategy: whitespace or regex.') + parser.add_argument( + '--oov_token', + default='', + help='Reserved out-of-vocabulary token to write first.') + parser.add_argument( + '--input_expand_factor', + type=int, + default=1, + help=( + 'Batch-only: repeat each input line this many times to scale volume ' + 'for load/perf testing.')) + parser.add_argument( + '--artifact_location', + default='', + help=( + 'Artifact directory for MLTransform output. If empty, a temporary ' + 'local directory is used.')) + return parser.parse_known_args(argv) + + +def validate_args(args) -> list[str]: + has_input_file = bool(args.input_file) + has_input_table = bool(args.input_table) + if not has_input_file and not has_input_table: + raise ValueError('One of --input_file or --input_table is required.') + if has_input_file and has_input_table: + raise ValueError('Use exactly one of --input_file or --input_table.') + if not args.output_vocab: + raise ValueError('--output_vocab is required.') + if not args.columns: + raise ValueError('--columns is required.') + if args.vocab_size is None or args.vocab_size <= 0: + raise ValueError('--vocab_size must be > 0.') + if args.min_frequency is None or args.min_frequency < 1: + raise ValueError('--min_frequency must be >= 1.') + if args.tokenizer not in SUPPORTED_TOKENIZERS: + raise ValueError( + f'Unsupported tokenizer {args.tokenizer!r}. ' + f'Supported tokenizers: {", ".join(SUPPORTED_TOKENIZERS)}') + if not args.oov_token: + raise ValueError('--oov_token must be non-empty.') + if args.input_expand_factor is None or args.input_expand_factor < 1: + raise ValueError('--input_expand_factor must be >= 1.') + return [col.strip() for col in args.columns.split(',') if col.strip()] + + +def run(argv=None, test_pipeline=None): + known_args, pipeline_args = parse_known_args(argv) + columns = validate_args(known_args) + lowercase = parse_bool_flag(known_args.lowercase) + artifact_location = known_args.artifact_location or tempfile.mkdtemp( + prefix='mltransform_generate_vocab_artifacts_') + + options = PipelineOptions(pipeline_args) + pipeline = test_pipeline or beam.Pipeline(options=options) + + if known_args.input_file: + lines = ( + pipeline + | 'ReadInputFile' >> beam.io.ReadFromText(known_args.input_file)) + if known_args.input_expand_factor > 1: + lines = ( + lines + | 'ExpandInputForPerf' >> beam.FlatMap( + lambda line, n: [line] * n, known_args.input_expand_factor)) + rows = lines | 'ParseJSON' >> beam.Map(_parse_json_line) + else: + rows = pipeline | 'ReadInputTable' >> beam.io.ReadFromBigQuery( + table=known_args.input_table) + + token_lists = ( + rows + | 'ExtractColumnValues' >> + beam.Map(lambda row: _extract_column_values(row, columns)) + | 'TokenizeRowValues' >> beam.Map( + lambda values: _tokenize_row_values( + values, lowercase=lowercase, tokenizer=known_args.tokenizer)) + | 'DropEmptyTokenLists' >> beam.Filter(bool)) + + _ = ( + token_lists + | 'MLTransformInput' >> beam.Map(lambda tokens: {'tokens': tokens}) + | 'ApplyMLTransform' >> + MLTransform(write_artifact_location=artifact_location).with_transform( + ComputeAndApplyVocabulary( + columns=['tokens'], + top_k=known_args.vocab_size, + frequency_threshold=known_args.min_frequency, + vocab_filename='vocab')) + | 'ExtractTransformedTokens' >> beam.Map(lambda row: row.tokens) + | 'FlattenTokens' >> beam.FlatMap(list) + | 'DropEmptyTokens' >> beam.Filter(bool)) + + result = pipeline.run() + result.wait_until_finish() + + vocab_tokens = _read_vocab_tokens( + _resolve_vocab_asset_path( + artifact_location=artifact_location, + vocab_filename='vocab', + column_name='tokens')) + output_tokens = [known_args.oov_token] + output_tokens.extend( + token for token in vocab_tokens if token != known_args.oov_token) + if len(output_tokens) == 1: + logging.warning( + 'No tokens remained after filtering; writing only reserved token %r.', + known_args.oov_token) + _write_vocab_file(known_args.output_vocab, output_tokens) + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py new file mode 100644 index 000000000000..4b7c540295ae --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py @@ -0,0 +1,214 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import os +import tempfile +import unittest + +try: + from apache_beam.examples.ml_transform import mltransform_generate_vocab +except ImportError: # pylint: disable=bare-except + raise unittest.SkipTest('tensorflow_transform is not installed.') + + +class MLTransformGenerateVocabUnitTest(unittest.TestCase): + def test_normalize_and_tokenize_whitespace(self): + text = mltransform_generate_vocab.normalize_text(' Hello Beam ', True) + self.assertEqual(text, 'hello beam') + tokens = mltransform_generate_vocab.tokenize_text(text, 'whitespace') + self.assertEqual(tokens, ['hello', 'beam']) + + def test_tokenize_regex(self): + tokens = mltransform_generate_vocab.tokenize_text( + 'beam,beam! 123', tokenizer='regex') + self.assertEqual(tokens, ['beam', 'beam', '123']) + + def test_null_and_empty_handling_helpers(self): + normalized_none = mltransform_generate_vocab.normalize_text(None, True) + self.assertEqual(normalized_none, '') + self.assertEqual( + mltransform_generate_vocab._tokenize_row_values( + [None, '', ' ', 'Beam'], lowercase=True, tokenizer='whitespace'), + ['beam']) + + +class MLTransformGenerateVocabCliValidationTest(unittest.TestCase): + def test_missing_required_args(self): + args, _ = mltransform_generate_vocab.parse_known_args([]) + with self.assertRaisesRegex(ValueError, 'input_file or --input_table'): + mltransform_generate_vocab.validate_args(args) + + def test_invalid_numeric_values(self): + args, _ = mltransform_generate_vocab.parse_known_args([ + '--input_file=a.jsonl', + '--output_vocab=/tmp/vocab', + '--columns=text', + '--vocab_size=0', + '--min_frequency=0', + ]) + with self.assertRaisesRegex(ValueError, 'vocab_size'): + mltransform_generate_vocab.validate_args(args) + + def test_invalid_tokenizer(self): + args, _ = mltransform_generate_vocab.parse_known_args([ + '--input_file=a.jsonl', + '--output_vocab=/tmp/vocab', + '--columns=text', + '--tokenizer=custom', + ]) + with self.assertRaisesRegex(ValueError, 'Unsupported tokenizer'): + mltransform_generate_vocab.validate_args(args) + + def test_invalid_input_expand_factor(self): + args, _ = mltransform_generate_vocab.parse_known_args([ + '--input_file=a.jsonl', + '--output_vocab=/tmp/vocab', + '--columns=text', + '--input_expand_factor=0', + ]) + with self.assertRaisesRegex(ValueError, 'input_expand_factor'): + mltransform_generate_vocab.validate_args(args) + + +class MLTransformGenerateVocabIntegrationTest(unittest.TestCase): + def test_batch_pipeline_exact_output_order(self): + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, 'input.jsonl') + output_prefix = os.path.join(tmpdir, 'vocab.txt') + + rows = [ + { + 'id': '1', 'text': 'Beam beam ML pipeline' + }, + { + 'id': '2', 'text': 'Beam pipeline dataflow' + }, + { + 'id': '3', 'text': 'ML transform beam' + }, + { + 'id': '4', 'text': 'vocab vocab vocab test' + }, + { + 'id': '5', 'text': 'rare_token_once' + }, + { + 'id': '6', 'text': '' + }, + { + 'id': '7', 'text': None + }, + ] + with open(input_path, 'w', encoding='utf-8') as f: + for row in rows: + f.write(json.dumps(row) + '\n') + + mltransform_generate_vocab.run([ + f'--input_file={input_path}', + f'--output_vocab={output_prefix}', + '--columns=text', + '--vocab_size=3', + '--min_frequency=2', + '--lowercase=true', + '--tokenizer=whitespace', + '--oov_token=', + '--runner=DirectRunner', + ]) + + output_path = output_prefix + with open(output_path, 'r', encoding='utf-8') as f: + output_tokens = [line.rstrip('\n') for line in f] + + self.assertEqual(output_tokens[0], '') + self.assertEqual(set(output_tokens[1:]), {'beam', 'vocab', 'ml'}) + self.assertEqual(len(output_tokens), 4) + + def test_output_is_stable_across_runs(self): + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, 'input.jsonl') + output_prefix = os.path.join(tmpdir, 'vocab.txt') + rows = [ + { + 'text': 'apple banana' + }, + { + 'text': 'banana apple' + }, + { + 'text': 'cat dog' + }, + { + 'text': 'dog cat' + }, + ] + with open(input_path, 'w', encoding='utf-8') as f: + for row in rows: + f.write(json.dumps(row) + '\n') + + common_args = [ + f'--input_file={input_path}', + '--columns=text', + '--vocab_size=4', + '--min_frequency=2', + '--lowercase=true', + '--tokenizer=whitespace', + '--oov_token=', + '--runner=DirectRunner', + ] + + mltransform_generate_vocab.run( + common_args + [f'--output_vocab={output_prefix}']) + + output_path = output_prefix + with open(output_path, 'r', encoding='utf-8') as f: + first_run_tokens = [line.rstrip('\n') for line in f] + + output_prefix_second = os.path.join(tmpdir, 'vocab_second.txt') + mltransform_generate_vocab.run( + common_args + [f'--output_vocab={output_prefix_second}']) + + with open(output_prefix_second, 'r', encoding='utf-8') as f: + second_run_tokens = [line.rstrip('\n') for line in f] + + self.assertEqual(first_run_tokens, second_run_tokens) + + def test_empty_filtered_result_writes_only_reserved_token(self): + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, 'input.jsonl') + output_prefix = os.path.join(tmpdir, 'vocab.txt') + with open(input_path, 'w', encoding='utf-8') as f: + f.write(json.dumps({'text': 'beam'}) + '\n') + + mltransform_generate_vocab.run([ + f'--input_file={input_path}', + f'--output_vocab={output_prefix}', + '--columns=text', + '--vocab_size=10', + '--min_frequency=2', + '--oov_token=', + '--runner=DirectRunner', + ]) + + output_path = output_prefix + with open(output_path, 'r', encoding='utf-8') as f: + output_tokens = [line.rstrip('\n') for line in f] + self.assertEqual(output_tokens, ['']) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/ml/transforms/mltransform_tests_requirements.txt b/sdks/python/apache_beam/ml/transforms/mltransform_tests_requirements.txt new file mode 100644 index 000000000000..bb9cf9962bd9 --- /dev/null +++ b/sdks/python/apache_beam/ml/transforms/mltransform_tests_requirements.txt @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Keep this benchmark requirements focused and deterministic for Dataflow +# workers. MLTransform TFT operations require a consistent TensorFlow Transform +# dependency set; otherwise workers can crash-loop with pandas/numpy ABI +# mismatches during SDK harness startup. +google-cloud-monitoring>=2.27.0 +tensorflow_transform>=1.14.0,<1.15.0 +tensorflow-metadata>=1.14.0,<1.15.0 +tfx-bsl>=1.14.0,<1.15.0 +# tfx-bsl / tensorflow-transform rely on pandas 1.x with numpy 1.x. +numpy<2 +pandas<2 +# tensorflow-transform expects dill but does not hard-pin it. +dill + diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py new file mode 100644 index 000000000000..5d3d6ef22d88 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Benchmark test for the batch-only MLTransform Generate Vocab pipeline.""" + +import logging + +from apache_beam.examples.ml_transform import mltransform_generate_vocab +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark + + +class MLTransformGenerateVocabBenchmarkTest(DataflowCostBenchmark): + """Runs the MLTransform vocab generation pipeline as a cost benchmark.""" + def __init__(self): + # Namespace used by benchmark dashboards. + self.metrics_namespace = 'BeamML_MLTransformVocab' + # Monitor token throughput after filtering empty values. + super().__init__( + metrics_namespace=self.metrics_namespace, + is_streaming=False, + pcollection='DropEmptyTokens.out0') + + def test(self): + """Execute the MLTransform vocab generation pipeline for benchmarking.""" + extra_opts = { + 'input_file': self.pipeline.get_option('input_file'), + 'output_vocab': self.pipeline.get_option('output_vocab'), + 'artifact_location': self.pipeline.get_option('artifact_location'), + 'columns': self.pipeline.get_option('columns'), + 'vocab_size': self.pipeline.get_option('vocab_size'), + 'min_frequency': self.pipeline.get_option('min_frequency'), + 'lowercase': self.pipeline.get_option('lowercase'), + 'tokenizer': self.pipeline.get_option('tokenizer'), + 'oov_token': self.pipeline.get_option('oov_token'), + 'input_expand_factor': self.pipeline.get_option('input_expand_factor'), + } + + self.result = mltransform_generate_vocab.run( + self.pipeline.get_full_options_as_args(**extra_opts)) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + MLTransformGenerateVocabBenchmarkTest().run() diff --git a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py index 3df96c03a8a2..635404682b97 100644 --- a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py +++ b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py @@ -193,6 +193,29 @@ def _get_throughput_metrics( """Query Cloud Monitoring for per-PCollection throughput.""" name = ( pcollection_name if pcollection_name is not None else self.pcollection) + + def _point_numeric_value(point) -> float: + value = point.value + # point.value is proto-plus, so use the underlying protobuf oneof. + raw_value = getattr(value, '_pb', None) + if raw_value is not None: + active_field = raw_value.WhichOneof('value') + if active_field == 'double_value': + return float(value.double_value) + if active_field == 'int64_value': + return float(value.int64_value) + if active_field == 'distribution_value': + # Use aligned mean for distribution-valued points. + distribution = value.distribution_value + if distribution.count > 0: + return float(distribution.mean) + return 0.0 + if active_field == 'money_value': + money = value.money_value + nanos = getattr(money, 'nanos', 0) or 0 + return float(money.units) + (float(nanos) / 1_000_000_000.0) + return 0.0 + interval = monitoring_v3.TimeInterval( start_time=start_time, end_time=end_time) aggregation = monitoring_v3.Aggregation( @@ -221,7 +244,7 @@ def _get_throughput_metrics( for key, req in requests.items(): time_series = self.monitoring_client.list_time_series(request=req) values = [ - point.value.double_value for series in time_series + _point_numeric_value(point) for series in time_series for point in series.points ] metrics[f"AvgThroughput{key}"] = sum(values) / len( diff --git a/website/www/site/content/en/performance/_index.md b/website/www/site/content/en/performance/_index.md index 1624c58efe2e..350524a3c86d 100644 --- a/website/www/site/content/en/performance/_index.md +++ b/website/www/site/content/en/performance/_index.md @@ -59,3 +59,4 @@ See the following pages for performance measures recorded when running various B - [TensorFlow MNIST Image Classification](/performance/tensorflowmnist) - [VLLM Gemma Batch Completion Tesla T4 GPU](/performance/vllmgemmabatchtesla) - [Table Row Inference Sklearn Batch](/performance/tablerowinference) +- [MLTransform Generate Vocab (batch)](/performance/mltransformvocab) diff --git a/website/www/site/content/en/performance/mltransformvocab/_index.md b/website/www/site/content/en/performance/mltransformvocab/_index.md new file mode 100644 index 000000000000..4aac7f08e83d --- /dev/null +++ b/website/www/site/content/en/performance/mltransformvocab/_index.md @@ -0,0 +1,37 @@ +--- +title: "MLTransform Generate Vocab Performance" +--- + + + +# MLTransform Generate Vocab Performance + +**Model**: Apache Beam MLTransform — ComputeAndApplyVocabulary (Generate Vocab) + TFIDF +**Accelerator**: CPU (batch) +**Host**: MLTransform vocab pipeline on Dataflow (batch) + +This batch pipeline computes a vocabulary from input text columns using +`ComputeAndApplyVocabulary`, then produces TF-IDF outputs using `TFIDF`. + +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available here: +https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="mltransformvocab" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="mltransformvocab" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="mltransformvocab" read_or_write="write" section="date" >}} + diff --git a/website/www/site/data/performance.yaml b/website/www/site/data/performance.yaml index 8841bbb58ecb..9725a4af99a9 100644 --- a/website/www/site/data/performance.yaml +++ b/website/www/site/data/performance.yaml @@ -283,3 +283,19 @@ looks: title: AvgThroughputBytesPerSec by Version - id: P7wKZy6tQFWbbDfm4HzfCJnsQrVgfGsJ title: AvgThroughputElementsPerSec by Version + mltransformvocab: + write: + folder: 107 + cost: + - id: CccNCHQ23Js2tmxTDYVnjxwfx2zSFSjY + title: RunTime and EstimatedCost + date: + - id: n7Gn7c7KSW52ZgXCQXndHgWtjVPK8W33 + title: AvgThroughputBytesPerSec by Date + - id: 3tcg6cSh6tBg6FmpTxgzyMtvFqWKkQn3 + title: AvgThroughputElementsPerSec by Date + version: + - id: tPYwJngPDBsKjK3DNgGHGsCyTpxdfmTB + title: AvgThroughputBytesPerSec by Version + - id: cC75NnCbQT3mQmKVHtDxzptXpwPb64qz + title: AvgThroughputElementsPerSec by Version From 0b010638df1fe5c3b47b98fd0d1053e9eefdc502 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Mon, 4 May 2026 22:56:32 +0300 Subject: [PATCH 2/5] Refactor vocab pipeline to use direct MLTransform vocabulary pattern --- ...aflow_MLTransform_Generate_Vocab_Batch.txt | 2 - .../examples/ml_transform/README.md | 24 +---- .../mltransform_generate_vocab.py | 95 +++++-------------- .../mltransform_generate_vocab_test.py | 43 +++------ .../mltransform_generate_vocab_benchmark.py | 2 - 5 files changed, 40 insertions(+), 126 deletions(-) diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt index 5d91f6ce08b2..22e14643a6ab 100644 --- a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_Generate_Vocab_Batch.txt @@ -38,7 +38,5 @@ --vocab_size=50000 --min_frequency=1 --lowercase=true ---tokenizer=whitespace ---oov_token= --input_expand_factor=1 diff --git a/sdks/python/apache_beam/examples/ml_transform/README.md b/sdks/python/apache_beam/examples/ml_transform/README.md index 8914180ca3a8..a47fba1ff0c8 100644 --- a/sdks/python/apache_beam/examples/ml_transform/README.md +++ b/sdks/python/apache_beam/examples/ml_transform/README.md @@ -25,11 +25,10 @@ rows using `MLTransform` + `ComputeAndApplyVocabulary`. 1. Reads input rows from JSONL (`--input_file`) or BigQuery (`--input_table`). 2. Extracts specified columns (`--columns`). -3. Normalizes text (`trim`, optional lowercasing). -4. Tokenizes text (`whitespace` or `regex` tokenizer). -5. Runs `ComputeAndApplyVocabulary` with top-k and min-frequency constraints. -6. Ensures `--oov_token` is included first. -7. Writes the vocabulary as one token per line. +3. Normalizes and combines text values (`trim`, optional lowercasing). +4. Runs `ComputeAndApplyVocabulary` with top-k and min-frequency constraints + using space-delimited token splitting. +5. Writes the vocabulary as one token per line. ### Required arguments @@ -44,8 +43,6 @@ rows using `MLTransform` + `ComputeAndApplyVocabulary`. - `--vocab_size` (default: `50000`) - `--min_frequency` (default: `1`) - `--lowercase` (default: `true`) -- `--tokenizer` (`whitespace` or `regex`, default: `whitespace`) -- `--oov_token` (default: ``) - `--input_expand_factor` (default: `1`, useful for perf/load testing) ### Local batch example @@ -58,8 +55,6 @@ python -m apache_beam.examples.ml_transform.mltransform_generate_vocab \ --vocab_size=5 \ --min_frequency=1 \ --lowercase=true \ - --tokenizer=whitespace \ - --oov_token= \ --input_expand_factor=1 \ --runner=DirectRunner ``` @@ -85,14 +80,11 @@ sample data programmatically. One token per line: -1. `oov_token` first -2. remaining tokens follow the vocabulary order produced by - `ComputeAndApplyVocabulary`. +1. tokens follow the vocabulary order produced by `ComputeAndApplyVocabulary`. Example output: ```txt - beam ml ``` @@ -106,17 +98,11 @@ For this sample and config: the expected output is: ```txt - beam vocab ml ``` -### Empty vocabulary behavior - -If all tokens are filtered out by `--min_frequency`, the pipeline writes only -the reserved `--oov_token` and logs a warning. - ### Additional test datasets Test data for happy path and null/empty/missing columns is generated inline in diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py index 7d246aab9da1..458bc3e83a0c 100644 --- a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py @@ -22,14 +22,12 @@ Key properties: - Batch only (no streaming path). - Vocabulary generation via MLTransform ComputeAndApplyVocabulary. -- Reserved OOV token is always written first. - Output format: one token per line. """ import argparse import json import logging -import re import tempfile from typing import Any @@ -39,10 +37,6 @@ from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary from apache_beam.options.pipeline_options import PipelineOptions -SUPPORTED_TOKENIZERS = ('whitespace', 'regex') -DEFAULT_REGEX_PATTERN = r"[A-Za-z0-9_]+" - - def parse_bool_flag(value: str) -> bool: value_lc = value.strip().lower() if value_lc in ('1', 'true', 't', 'yes', 'y'): @@ -62,21 +56,6 @@ def normalize_text(value: Any, lowercase: bool = True) -> str: return text -def tokenize_text( - text: str, - tokenizer: str = 'whitespace', - regex_pattern: str = DEFAULT_REGEX_PATTERN) -> list[str]: - if not text: - return [] - if tokenizer == 'whitespace': - return [token for token in text.split() if token] - if tokenizer == 'regex': - return re.findall(regex_pattern, text) - raise ValueError( - f'Unsupported tokenizer {tokenizer!r}. ' - f'Supported tokenizers: {", ".join(SUPPORTED_TOKENIZERS)}') - - def _parse_json_line(line: str) -> dict[str, Any]: try: parsed = json.loads(line) @@ -89,9 +68,8 @@ def _parse_json_line(line: str) -> dict[str, Any]: return parsed -def _extract_column_values(row: dict[str, Any], - columns: list[str]) -> list[Any]: - values = [] +def _extract_column_values(row: dict[str, Any], columns: list[str]) -> list[str]: + values: list[str] = [] for col in columns: if col not in row: continue @@ -99,22 +77,20 @@ def _extract_column_values(row: dict[str, Any], if val is None: continue if isinstance(val, list): - values.extend(val) + values.extend(str(item) for item in val if item is not None) else: - values.append(val) + values.append(str(val)) return values -def _tokenize_row_values( - values: list[Any], - lowercase: bool, - tokenizer: str, -) -> list[str]: - tokens: list[str] = [] - for value in values: - normalized = normalize_text(value, lowercase=lowercase) - tokens.extend(tokenize_text(normalized, tokenizer=tokenizer)) - return tokens +def _build_vocab_text(row: dict[str, Any], columns: list[str], + lowercase: bool) -> str: + values = _extract_column_values(row, columns) + normalized_values = [ + normalize_text(value, lowercase=lowercase) for value in values + ] + non_empty_values = [value for value in normalized_values if value] + return ' '.join(non_empty_values) def _resolve_vocab_asset_path( @@ -171,15 +147,7 @@ def parse_known_args(argv): parser.add_argument( '--lowercase', default='true', - help='Whether to lowercase text before tokenization (default: true).') - parser.add_argument( - '--tokenizer', - default='whitespace', - help='Tokenizer strategy: whitespace or regex.') - parser.add_argument( - '--oov_token', - default='', - help='Reserved out-of-vocabulary token to write first.') + help='Whether to lowercase text before vocabulary generation.') parser.add_argument( '--input_expand_factor', type=int, @@ -211,12 +179,6 @@ def validate_args(args) -> list[str]: raise ValueError('--vocab_size must be > 0.') if args.min_frequency is None or args.min_frequency < 1: raise ValueError('--min_frequency must be >= 1.') - if args.tokenizer not in SUPPORTED_TOKENIZERS: - raise ValueError( - f'Unsupported tokenizer {args.tokenizer!r}. ' - f'Supported tokenizers: {", ".join(SUPPORTED_TOKENIZERS)}') - if not args.oov_token: - raise ValueError('--oov_token must be non-empty.') if args.input_expand_factor is None or args.input_expand_factor < 1: raise ValueError('--input_expand_factor must be >= 1.') return [col.strip() for col in args.columns.split(',') if col.strip()] @@ -246,26 +208,24 @@ def run(argv=None, test_pipeline=None): rows = pipeline | 'ReadInputTable' >> beam.io.ReadFromBigQuery( table=known_args.input_table) - token_lists = ( + vocab_text = ( rows - | 'ExtractColumnValues' >> - beam.Map(lambda row: _extract_column_values(row, columns)) - | 'TokenizeRowValues' >> beam.Map( - lambda values: _tokenize_row_values( - values, lowercase=lowercase, tokenizer=known_args.tokenizer)) - | 'DropEmptyTokenLists' >> beam.Filter(bool)) + | 'BuildVocabText' >> beam.Map( + lambda row: _build_vocab_text(row, columns, lowercase)) + | 'DropEmptyText' >> beam.Filter(bool)) _ = ( - token_lists - | 'MLTransformInput' >> beam.Map(lambda tokens: {'tokens': tokens}) + vocab_text + | 'MLTransformInput' >> beam.Map(lambda text: {'text': text}) | 'ApplyMLTransform' >> MLTransform(write_artifact_location=artifact_location).with_transform( ComputeAndApplyVocabulary( - columns=['tokens'], + columns=['text'], top_k=known_args.vocab_size, frequency_threshold=known_args.min_frequency, + split_string_by_delimiter=' ', vocab_filename='vocab')) - | 'ExtractTransformedTokens' >> beam.Map(lambda row: row.tokens) + | 'ExtractTransformedTokens' >> beam.Map(lambda row: row.text) | 'FlattenTokens' >> beam.FlatMap(list) | 'DropEmptyTokens' >> beam.Filter(bool)) @@ -276,15 +236,8 @@ def run(argv=None, test_pipeline=None): _resolve_vocab_asset_path( artifact_location=artifact_location, vocab_filename='vocab', - column_name='tokens')) - output_tokens = [known_args.oov_token] - output_tokens.extend( - token for token in vocab_tokens if token != known_args.oov_token) - if len(output_tokens) == 1: - logging.warning( - 'No tokens remained after filtering; writing only reserved token %r.', - known_args.oov_token) - _write_vocab_file(known_args.output_vocab, output_tokens) + column_name='text')) + _write_vocab_file(known_args.output_vocab, vocab_tokens) return result diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py index 4b7c540295ae..3de9b79ef460 100644 --- a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py @@ -22,29 +22,24 @@ try: from apache_beam.examples.ml_transform import mltransform_generate_vocab -except ImportError: # pylint: disable=bare-except +except ImportError: raise unittest.SkipTest('tensorflow_transform is not installed.') class MLTransformGenerateVocabUnitTest(unittest.TestCase): - def test_normalize_and_tokenize_whitespace(self): + def test_normalize_text(self): text = mltransform_generate_vocab.normalize_text(' Hello Beam ', True) self.assertEqual(text, 'hello beam') - tokens = mltransform_generate_vocab.tokenize_text(text, 'whitespace') - self.assertEqual(tokens, ['hello', 'beam']) - - def test_tokenize_regex(self): - tokens = mltransform_generate_vocab.tokenize_text( - 'beam,beam! 123', tokenizer='regex') - self.assertEqual(tokens, ['beam', 'beam', '123']) def test_null_and_empty_handling_helpers(self): normalized_none = mltransform_generate_vocab.normalize_text(None, True) self.assertEqual(normalized_none, '') self.assertEqual( - mltransform_generate_vocab._tokenize_row_values( - [None, '', ' ', 'Beam'], lowercase=True, tokenizer='whitespace'), - ['beam']) + mltransform_generate_vocab._build_vocab_text( + { + 'text': ['Beam', None, ' ', 'Flow'] + }, ['text'], lowercase=True), + 'beam flow') class MLTransformGenerateVocabCliValidationTest(unittest.TestCase): @@ -64,16 +59,6 @@ def test_invalid_numeric_values(self): with self.assertRaisesRegex(ValueError, 'vocab_size'): mltransform_generate_vocab.validate_args(args) - def test_invalid_tokenizer(self): - args, _ = mltransform_generate_vocab.parse_known_args([ - '--input_file=a.jsonl', - '--output_vocab=/tmp/vocab', - '--columns=text', - '--tokenizer=custom', - ]) - with self.assertRaisesRegex(ValueError, 'Unsupported tokenizer'): - mltransform_generate_vocab.validate_args(args) - def test_invalid_input_expand_factor(self): args, _ = mltransform_generate_vocab.parse_known_args([ '--input_file=a.jsonl', @@ -125,8 +110,6 @@ def test_batch_pipeline_exact_output_order(self): '--vocab_size=3', '--min_frequency=2', '--lowercase=true', - '--tokenizer=whitespace', - '--oov_token=', '--runner=DirectRunner', ]) @@ -134,9 +117,8 @@ def test_batch_pipeline_exact_output_order(self): with open(output_path, 'r', encoding='utf-8') as f: output_tokens = [line.rstrip('\n') for line in f] - self.assertEqual(output_tokens[0], '') - self.assertEqual(set(output_tokens[1:]), {'beam', 'vocab', 'ml'}) - self.assertEqual(len(output_tokens), 4) + self.assertEqual(set(output_tokens), {'beam', 'vocab', 'ml'}) + self.assertEqual(len(output_tokens), 3) def test_output_is_stable_across_runs(self): with tempfile.TemporaryDirectory() as tmpdir: @@ -166,8 +148,6 @@ def test_output_is_stable_across_runs(self): '--vocab_size=4', '--min_frequency=2', '--lowercase=true', - '--tokenizer=whitespace', - '--oov_token=', '--runner=DirectRunner', ] @@ -187,7 +167,7 @@ def test_output_is_stable_across_runs(self): self.assertEqual(first_run_tokens, second_run_tokens) - def test_empty_filtered_result_writes_only_reserved_token(self): + def test_empty_filtered_result_writes_empty_vocab(self): with tempfile.TemporaryDirectory() as tmpdir: input_path = os.path.join(tmpdir, 'input.jsonl') output_prefix = os.path.join(tmpdir, 'vocab.txt') @@ -200,14 +180,13 @@ def test_empty_filtered_result_writes_only_reserved_token(self): '--columns=text', '--vocab_size=10', '--min_frequency=2', - '--oov_token=', '--runner=DirectRunner', ]) output_path = output_prefix with open(output_path, 'r', encoding='utf-8') as f: output_tokens = [line.rstrip('\n') for line in f] - self.assertEqual(output_tokens, ['']) + self.assertEqual(output_tokens, []) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py index 5d3d6ef22d88..8da57fc4a5d7 100644 --- a/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py +++ b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_generate_vocab_benchmark.py @@ -44,8 +44,6 @@ def test(self): 'vocab_size': self.pipeline.get_option('vocab_size'), 'min_frequency': self.pipeline.get_option('min_frequency'), 'lowercase': self.pipeline.get_option('lowercase'), - 'tokenizer': self.pipeline.get_option('tokenizer'), - 'oov_token': self.pipeline.get_option('oov_token'), 'input_expand_factor': self.pipeline.get_option('input_expand_factor'), } From 652e3b2f85ea2b4ee711f5102b330581360818ab Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 6 May 2026 11:08:52 +0300 Subject: [PATCH 3/5] fixed formatter --- .../ml_transform/mltransform_generate_vocab.py | 12 +++++++----- .../ml_transform/mltransform_generate_vocab_test.py | 4 +--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py index 458bc3e83a0c..f1623eacab9d 100644 --- a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py @@ -37,6 +37,7 @@ from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary from apache_beam.options.pipeline_options import PipelineOptions + def parse_bool_flag(value: str) -> bool: value_lc = value.strip().lower() if value_lc in ('1', 'true', 't', 'yes', 'y'): @@ -68,7 +69,8 @@ def _parse_json_line(line: str) -> dict[str, Any]: return parsed -def _extract_column_values(row: dict[str, Any], columns: list[str]) -> list[str]: +def _extract_column_values(row: dict[str, Any], + columns: list[str]) -> list[str]: values: list[str] = [] for col in columns: if col not in row: @@ -83,8 +85,8 @@ def _extract_column_values(row: dict[str, Any], columns: list[str]) -> list[str] return values -def _build_vocab_text(row: dict[str, Any], columns: list[str], - lowercase: bool) -> str: +def _build_vocab_text( + row: dict[str, Any], columns: list[str], lowercase: bool) -> str: values = _extract_column_values(row, columns) normalized_values = [ normalize_text(value, lowercase=lowercase) for value in values @@ -210,8 +212,8 @@ def run(argv=None, test_pipeline=None): vocab_text = ( rows - | 'BuildVocabText' >> beam.Map( - lambda row: _build_vocab_text(row, columns, lowercase)) + | 'BuildVocabText' >> + beam.Map(lambda row: _build_vocab_text(row, columns, lowercase)) | 'DropEmptyText' >> beam.Filter(bool)) _ = ( diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py index 3de9b79ef460..3e29bd3aae10 100644 --- a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab_test.py @@ -36,9 +36,7 @@ def test_null_and_empty_handling_helpers(self): self.assertEqual(normalized_none, '') self.assertEqual( mltransform_generate_vocab._build_vocab_text( - { - 'text': ['Beam', None, ' ', 'Flow'] - }, ['text'], lowercase=True), + {'text': ['Beam', None, ' ', 'Flow']}, ['text'], lowercase=True), 'beam flow') From 041bea2fa4fd156973af0ea27201b983c20097f5 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Fri, 8 May 2026 13:33:15 +0300 Subject: [PATCH 4/5] Fix vocab index filtering and MLTransform docs --- .../examples/ml_transform/mltransform_generate_vocab.py | 2 +- .../site/content/en/performance/mltransformvocab/_index.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py index f1623eacab9d..bc3cc0ac4f6b 100644 --- a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py @@ -229,7 +229,7 @@ def run(argv=None, test_pipeline=None): vocab_filename='vocab')) | 'ExtractTransformedTokens' >> beam.Map(lambda row: row.text) | 'FlattenTokens' >> beam.FlatMap(list) - | 'DropEmptyTokens' >> beam.Filter(bool)) + | 'DropEmptyTokens' >> beam.Filter(lambda token: token is not None)) result = pipeline.run() result.wait_until_finish() diff --git a/website/www/site/content/en/performance/mltransformvocab/_index.md b/website/www/site/content/en/performance/mltransformvocab/_index.md index 4aac7f08e83d..f7844a5a09ae 100644 --- a/website/www/site/content/en/performance/mltransformvocab/_index.md +++ b/website/www/site/content/en/performance/mltransformvocab/_index.md @@ -11,12 +11,12 @@ http://www.apache.org/licenses/LICENSE-2.0 # MLTransform Generate Vocab Performance -**Model**: Apache Beam MLTransform — ComputeAndApplyVocabulary (Generate Vocab) + TFIDF +**Model**: Apache Beam MLTransform — ComputeAndApplyVocabulary (Generate Vocab) **Accelerator**: CPU (batch) **Host**: MLTransform vocab pipeline on Dataflow (batch) This batch pipeline computes a vocabulary from input text columns using -`ComputeAndApplyVocabulary`, then produces TF-IDF outputs using `TFIDF`. +`ComputeAndApplyVocabulary` and writes vocabulary artifacts for downstream use. See the [glossary](/performance/glossary) for definitions. From f82a8ff70afe9d57cc75a5a1f386695732034dbc Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Fri, 8 May 2026 18:15:30 +0300 Subject: [PATCH 5/5] Remove unnecessary token filtering in vocab pipeline --- .../examples/ml_transform/mltransform_generate_vocab.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py index bc3cc0ac4f6b..69cab94e3581 100644 --- a/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_generate_vocab.py @@ -228,8 +228,7 @@ def run(argv=None, test_pipeline=None): split_string_by_delimiter=' ', vocab_filename='vocab')) | 'ExtractTransformedTokens' >> beam.Map(lambda row: row.text) - | 'FlattenTokens' >> beam.FlatMap(list) - | 'DropEmptyTokens' >> beam.Filter(lambda token: token is not None)) + | 'FlattenTokens' >> beam.FlatMap(list)) result = pipeline.run() result.wait_until_finish()