Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)).

## Breaking Changes
Expand Down
56 changes: 52 additions & 4 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,51 @@
from apache_beam.metrics.metricbase import Histogram
from apache_beam.metrics.metricbase import MetricName
from apache_beam.metrics.metricbase import StringSet
from apache_beam.options.pipeline_options import DebugOptions

if TYPE_CHECKING:
from apache_beam.internal.metrics.metric import MetricLogger
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.metricbase import Metric
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.histogram import BucketType

__all__ = ['Metrics', 'MetricsFilter', 'Lineage']

_LOGGER = logging.getLogger(__name__)


class MetricsFlag(object):
"""Process-wide flags controlling which user metric kinds are emitted."""
counter_disabled = False
string_set_disabled = False
bounded_trie_disabled = False
_initialized = False

@classmethod
def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
if cls._initialized:
return
debug_options = options.view_as(DebugOptions)
if debug_options.lookup_experiment('disableCounterMetrics'):
cls.counter_disabled = True
_LOGGER.info('Counter metrics are disabled.')
if debug_options.lookup_experiment('disableStringSetMetrics'):
cls.string_set_disabled = True
_LOGGER.info('StringSet metrics are disabled.')
if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
cls.bounded_trie_disabled = True
_LOGGER.info('BoundedTrie metrics are disabled.')
cls._initialized = True

@classmethod
def reset(cls) -> None:
cls.counter_disabled = False
cls.string_set_disabled = False
cls.bounded_trie_disabled = False
cls._initialized = False


class Metrics(object):
"""Lets users create/access metric objects during pipeline execution."""
@staticmethod
Expand Down Expand Up @@ -204,12 +237,17 @@ class DelegatingCounter(Counter):
def __init__(
self, metric_name: MetricName, process_wide: bool = False) -> None:
super().__init__(metric_name)
self.inc = MetricUpdater( # type: ignore[method-assign]
self._updater = MetricUpdater(
cells.CounterCell,
metric_name,
default_value=1,
process_wide=process_wide)

def inc(self, n: int = 1) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this introduced a breaking change

TypeError: Metrics.DelegatingCounter.inc() got an unexpected keyword argument 'value'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed parameter name there is still other breakages. Investigating.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about this, the regression is mine, and thanks for the quick fix.

For the other breakages you are still seeing, could you point me at the suite that surfaced them? I will reproduce and take the fixes.

I can also add a regression test covering inc(value=...) so this does not regress again, and open the public PR for the value rename if you have not already pushed it.

if MetricsFlag.counter_disabled:
return
self._updater(n)
Comment on lines +246 to +249

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Access the counter_disabled class attribute directly instead of calling a classmethod getter to avoid method call overhead on the hot path.

Suggested change
def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled():
return
self._updater(n)
def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled:
return
self._updater(n)


class DelegatingDistribution(Distribution):
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""
def __init__(
Expand All @@ -231,13 +269,23 @@ class DelegatingStringSet(StringSet):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.StringSetCell, metric_name)

def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled:
return
self._updater(value)

class DelegatingBoundedTrie(BoundedTrie):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)

def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled:
return
self._updater(value)
Comment on lines +274 to +288

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Access the string_set_disabled and bounded_trie_disabled class attributes directly instead of calling classmethod getters to avoid method call overhead on the hot path.

Suggested change
def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled():
return
self._updater(value)
class DelegatingBoundedTrie(BoundedTrie):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled():
return
self._updater(value)
def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled:
return
self._updater(value)
class DelegatingBoundedTrie(BoundedTrie):
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled:
return
self._updater(value)



class MetricResults(object):
Expand Down
104 changes: 104 additions & 0 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.metrics.metric import MetricsFlag
from apache_beam.metrics.metricbase import MetricName
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.worker import statesampler
from apache_beam.testing.metric_result_matchers import DistributionMatcher
Expand Down Expand Up @@ -121,6 +123,108 @@ def test_get_namespace_error(self):
with self.assertRaises(ValueError):
Metrics.get_namespace(object())

def test_metrics_flag(self):
MetricsFlag.reset()
try:
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions([
'--experiments=disableCounterMetrics',
'--experiments=disableStringSetMetrics',
'--experiments=disableBoundedTrieMetrics',
])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)
finally:
MetricsFlag.reset()

def test_disabled_counter_is_noop(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
MetricsFlag.reset()
try:
sampler.start()
with state:
container = MetricsEnvironment.current_container()
Metrics.counter('ns', 'baseline').inc()
self.assertEqual(len(container.metrics), 1)
options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
Metrics.counter('ns', 'after_disable').inc()
Metrics.counter('ns', 'after_disable').inc(5)
Metrics.counter('ns', 'after_disable').dec()
self.assertEqual(len(container.metrics), 1)
finally:
sampler.stop()
MetricsFlag.reset()

def test_disabled_string_set_is_noop(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
MetricsFlag.reset()
try:
sampler.start()
with state:
container = MetricsEnvironment.current_container()
Metrics.string_set('ns', 'baseline').add('seed')
self.assertEqual(len(container.metrics), 1)
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
Metrics.string_set('ns', 'after_disable').add('value')
self.assertEqual(len(container.metrics), 1)
finally:
sampler.stop()
MetricsFlag.reset()

def test_disabled_bounded_trie_is_noop(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ..._is_noop tests aren't checking anything.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 859715d. Each test now sets up a StateSampler with a MetricsContainer, runs a baseline inc/add to confirm the container grows to 1, then enables the disable experiment and runs another inc/add and asserts the count is still 1, proving no MetricCell was created.

sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
MetricsFlag.reset()
try:
sampler.start()
with state:
container = MetricsEnvironment.current_container()
Metrics.bounded_trie('ns', 'baseline').add(['a'])
self.assertEqual(len(container.metrics), 1)
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b'])
self.assertEqual(len(container.metrics), 1)
finally:
sampler.stop()
MetricsFlag.reset()

def test_counter_empty_name(self):
with self.assertRaises(ValueError):
Metrics.counter("namespace", "")
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
from apache_beam.metrics.metric import MetricsFlag
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -192,6 +193,7 @@ def __init__(
self._options = PipelineOptions([])

FileSystems.set_options(self._options)
MetricsFlag.set_default_pipeline_options(self._options)

if runner is None:
runner = self._options.view_as(StandardOptions).runner
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from apache_beam.internal import pickler
from apache_beam.io import filesystems
from apache_beam.metrics import metric
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False):
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
filesystems.FileSystems.set_options(sdk_pipeline_options)
metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
pickler.set_library(pickle_library)

Expand Down
Loading