From 56fd97f4f9a778e4e7bf4a9ec452190ab8d87352 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 10 Jun 2026 15:47:25 -0400 Subject: [PATCH 1/2] =?UTF-8?q?Revert=20"[Python]=20Honor=20disableCounter?= =?UTF-8?q?Metrics,=20disableStringSetMetrics,=20and=20di=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit c01ceeab24c7cae7167dbc08b21529339659094f. --- CHANGES.md | 1 - sdks/python/apache_beam/metrics/metric.py | 56 +--------- .../python/apache_beam/metrics/metric_test.py | 104 ------------------ sdks/python/apache_beam/pipeline.py | 2 - .../runners/worker/sdk_worker_main.py | 2 - 5 files changed, 4 insertions(+), 161 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ac8215f35494..698d88b01fab 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -69,7 +69,6 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)). ## Breaking Changes diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index a66eed640be6..b15237cae020 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -46,13 +46,11 @@ from apache_beam.metrics.metricbase import Histogram from apache_beam.metrics.metricbase import MetricName from apache_beam.metrics.metricbase import StringSet -from apache_beam.options.pipeline_options import DebugOptions if TYPE_CHECKING: from apache_beam.internal.metrics.metric import MetricLogger from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import Metric - from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils.histogram import BucketType __all__ = ['Metrics', 'MetricsFilter', 'Lineage'] @@ -60,37 +58,6 @@ _LOGGER = logging.getLogger(__name__) -class MetricsFlag(object): - """Process-wide flags controlling which user metric kinds are emitted.""" - counter_disabled = False - string_set_disabled = False - bounded_trie_disabled = False - _initialized = False - - @classmethod - def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: - if cls._initialized: - return - debug_options = options.view_as(DebugOptions) - if debug_options.lookup_experiment('disableCounterMetrics'): - cls.counter_disabled = True - _LOGGER.info('Counter metrics are disabled.') - if debug_options.lookup_experiment('disableStringSetMetrics'): - cls.string_set_disabled = True - _LOGGER.info('StringSet metrics are disabled.') - if debug_options.lookup_experiment('disableBoundedTrieMetrics'): - cls.bounded_trie_disabled = True - _LOGGER.info('BoundedTrie metrics are disabled.') - cls._initialized = True - - @classmethod - def reset(cls) -> None: - cls.counter_disabled = False - cls.string_set_disabled = False - cls.bounded_trie_disabled = False - cls._initialized = False - - class Metrics(object): """Lets users create/access metric objects during pipeline execution.""" @staticmethod @@ -237,17 +204,12 @@ class DelegatingCounter(Counter): def __init__( self, metric_name: MetricName, process_wide: bool = False) -> None: super().__init__(metric_name) - self._updater = MetricUpdater( + self.inc = MetricUpdater( # type: ignore[method-assign] cells.CounterCell, metric_name, default_value=1, process_wide=process_wide) - def inc(self, n: int = 1) -> None: - if MetricsFlag.counter_disabled: - return - self._updater(n) - class DelegatingDistribution(Distribution): """Metrics Distribution Delegates functionality to MetricsEnvironment.""" def __init__( @@ -269,23 +231,13 @@ class DelegatingStringSet(StringSet): """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) - self._updater = MetricUpdater(cells.StringSetCell, metric_name) - - def add(self, value: str) -> None: - if MetricsFlag.string_set_disabled: - return - self._updater(value) + self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign] class DelegatingBoundedTrie(BoundedTrie): - """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" + """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) - self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) - - def add(self, value) -> None: - if MetricsFlag.bounded_trie_disabled: - return - self._updater(value) + self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign] class MetricResults(object): diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 6937236a8aad..ae66200737b5 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -32,9 +32,7 @@ from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metric import Metrics from apache_beam.metrics.metric import MetricsFilter -from apache_beam.metrics.metric import MetricsFlag from apache_beam.metrics.metricbase import MetricName -from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner from apache_beam.runners.worker import statesampler from apache_beam.testing.metric_result_matchers import DistributionMatcher @@ -123,108 +121,6 @@ def test_get_namespace_error(self): with self.assertRaises(ValueError): Metrics.get_namespace(object()) - def test_metrics_flag(self): - MetricsFlag.reset() - try: - self.assertFalse(MetricsFlag.counter_disabled) - self.assertFalse(MetricsFlag.string_set_disabled) - self.assertFalse(MetricsFlag.bounded_trie_disabled) - - options = PipelineOptions(['--experiments=disableCounterMetrics']) - MetricsFlag.set_default_pipeline_options(options) - self.assertTrue(MetricsFlag.counter_disabled) - self.assertFalse(MetricsFlag.string_set_disabled) - self.assertFalse(MetricsFlag.bounded_trie_disabled) - - MetricsFlag.reset() - options = PipelineOptions(['--experiments=disableStringSetMetrics']) - MetricsFlag.set_default_pipeline_options(options) - self.assertFalse(MetricsFlag.counter_disabled) - self.assertTrue(MetricsFlag.string_set_disabled) - self.assertFalse(MetricsFlag.bounded_trie_disabled) - - MetricsFlag.reset() - options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) - MetricsFlag.set_default_pipeline_options(options) - self.assertFalse(MetricsFlag.counter_disabled) - self.assertFalse(MetricsFlag.string_set_disabled) - self.assertTrue(MetricsFlag.bounded_trie_disabled) - - MetricsFlag.reset() - options = PipelineOptions([ - '--experiments=disableCounterMetrics', - '--experiments=disableStringSetMetrics', - '--experiments=disableBoundedTrieMetrics', - ]) - MetricsFlag.set_default_pipeline_options(options) - self.assertTrue(MetricsFlag.counter_disabled) - self.assertTrue(MetricsFlag.string_set_disabled) - self.assertTrue(MetricsFlag.bounded_trie_disabled) - finally: - MetricsFlag.reset() - - def test_disabled_counter_is_noop(self): - sampler = statesampler.StateSampler('', counters.CounterFactory()) - statesampler.set_current_tracker(sampler) - state = sampler.scoped_state( - 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) - MetricsFlag.reset() - try: - sampler.start() - with state: - container = MetricsEnvironment.current_container() - Metrics.counter('ns', 'baseline').inc() - self.assertEqual(len(container.metrics), 1) - options = PipelineOptions(['--experiments=disableCounterMetrics']) - MetricsFlag.set_default_pipeline_options(options) - Metrics.counter('ns', 'after_disable').inc() - Metrics.counter('ns', 'after_disable').inc(5) - Metrics.counter('ns', 'after_disable').dec() - self.assertEqual(len(container.metrics), 1) - finally: - sampler.stop() - MetricsFlag.reset() - - def test_disabled_string_set_is_noop(self): - sampler = statesampler.StateSampler('', counters.CounterFactory()) - statesampler.set_current_tracker(sampler) - state = sampler.scoped_state( - 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) - MetricsFlag.reset() - try: - sampler.start() - with state: - container = MetricsEnvironment.current_container() - Metrics.string_set('ns', 'baseline').add('seed') - self.assertEqual(len(container.metrics), 1) - options = PipelineOptions(['--experiments=disableStringSetMetrics']) - MetricsFlag.set_default_pipeline_options(options) - Metrics.string_set('ns', 'after_disable').add('value') - self.assertEqual(len(container.metrics), 1) - finally: - sampler.stop() - MetricsFlag.reset() - - def test_disabled_bounded_trie_is_noop(self): - sampler = statesampler.StateSampler('', counters.CounterFactory()) - statesampler.set_current_tracker(sampler) - state = sampler.scoped_state( - 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) - MetricsFlag.reset() - try: - sampler.start() - with state: - container = MetricsEnvironment.current_container() - Metrics.bounded_trie('ns', 'baseline').add(['a']) - self.assertEqual(len(container.metrics), 1) - options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) - MetricsFlag.set_default_pipeline_options(options) - Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b']) - self.assertEqual(len(container.metrics), 1) - finally: - sampler.stop() - MetricsFlag.reset() - def test_counter_empty_name(self): with self.assertRaises(ValueError): Metrics.counter("namespace", "") diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 594660d9bea9..750868f7443a 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -73,7 +73,6 @@ from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems -from apache_beam.metrics.metric import MetricsFlag from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -193,7 +192,6 @@ def __init__( self._options = PipelineOptions([]) FileSystems.set_options(self._options) - MetricsFlag.set_default_pipeline_options(self._options) if runner is None: runner = self._options.view_as(StandardOptions).runner diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 58beda96d63d..754a631eaf33 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -33,7 +33,6 @@ from apache_beam.internal import pickler from apache_beam.io import filesystems -from apache_beam.metrics import metric from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -124,7 +123,6 @@ def create_harness(environment, dry_run=False): RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) filesystems.FileSystems.set_options(sdk_pipeline_options) - metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options) pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library pickler.set_library(pickle_library) From 43c1f2af622380c904bd84cd9af8bb74dab99c74 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 10 Jun 2026 15:50:53 -0400 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- sdks/python/apache_beam/metrics/metric.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index b15237cae020..6e6757be11d9 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -234,7 +234,7 @@ def __init__(self, metric_name: MetricName) -> None: self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign] class DelegatingBoundedTrie(BoundedTrie): - """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" + """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]