Skip to content

[Failing Test]: TestAnomalyDetection.test_multiple_detectors_without_aggregation_0 is possibly flaky #37779

@tvalentyn

Description

@tvalentyn

What happened?

Saw the below error on https://github.com/apache/beam/actions/runs/22721571966/job/65884874499?pr=37725:

=================================== FAILURES ===================================
______ TestAnomalyDetection.test_multiple_detectors_without_aggregation_0 ______
[gw2] linux -- Python 3.13.3 /runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/bin/python

a = (<apache_beam.ml.anomaly.transforms_test.TestAnomalyDetection testMethod=test_multiple_detectors_without_aggregation_0>,)
kw = {}

    @wraps(func)
    def standalone_func(*a, **kw):
>       return func(*(a + p.args), **p.kwargs, **kw)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/parameterized/parameterized.py:620: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/anomaly/transforms_test.py:254: in test_multiple_detectors_without_aggregation
    with beam.Pipeline() as p:
         ^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/pipeline.py:655: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7cf663edb380>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      last_error_text = None
    
      def read_messages() -> None:
        nonlocal last_error_text
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            mr = message.message_response
            logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
            if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
              last_error_text = mr.message_text
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
E         File "apache_beam/runners/common.py", line 1588, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E           raise exn
E         File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process
E           self.output_handler.handle_process_outputs(
E         File "apache_beam/runners/common.py", line 1683, in apache_beam.runners.common._OutputHandler.handle_process_outputs
E           self._write_value_to_tag(tag, windowed_value, watermark_estimator)
E         File "apache_beam/runners/common.py", line 1796, in apache_beam.runners.common._OutputHandler._write_value_to_tag
E           self.main_receivers.receive(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E           self.consumer.process(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
E           with self.scoped_process_state:
E         File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process
E           delayed_applications = self.dofn_runner.process(o)
E         File "apache_beam/runners/common.py", line 1500, in apache_beam.runners.common.DoFnRunner.process
E           self._reraise_augmented(exn, windowed_value)
E         File "apache_beam/runners/common.py", line 1609, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E           raise new_exn
E         File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 912, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           self._invoke_process_per_window(
E         File "apache_beam/runners/common.py", line 1057, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           self.process_method(*args_for_process, **kwargs_for_process),
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/transforms/core.py", line 2116, in <lambda>
E           wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
E                                                 ~~^^^^^^^^^^^^^^^^^^^^
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/testing/util.py", line 203, in _equal
E           raise BeamAssertException(msg)
E       apache_beam.testing.util.BeamAssertException: Failed assert: [(1, AnomalyResult(example=Row(x1=1, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=2.1213203435596424, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=10, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=8.0, label=1, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5773502691896252, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.16452254913212455, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)])), (2, AnomalyResult(example=Row(x1=100, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)]))] == [(1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=1, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=10, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=8.0, label=1, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5773502691896252, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)])), (2, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=100, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=2.1213203435596424, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.0, label=0, threshold=2, info='', source_predictions=None)]))], unexpected elements [(1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)]))], missing elements [(1, AnomalyResult(example=Row(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.16452254913212455, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)]))] [while running 'assert_that/Match']

Issue Failure

Failure: Test is flaky

Issue Priority

Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be sure the product is healthy)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions