-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Description
What happened?
Saw the below error on https://github.com/apache/beam/actions/runs/22721571966/job/65884874499?pr=37725:
=================================== FAILURES ===================================
______ TestAnomalyDetection.test_multiple_detectors_without_aggregation_0 ______
[gw2] linux -- Python 3.13.3 /runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/bin/python
a = (<apache_beam.ml.anomaly.transforms_test.TestAnomalyDetection testMethod=test_multiple_detectors_without_aggregation_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/anomaly/transforms_test.py:254: in test_multiple_detectors_without_aggregation
with beam.Pipeline() as p:
^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/pipeline.py:655: in __exit__
self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7cf663edb380>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
E File "apache_beam/runners/common.py", line 1588, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E raise exn
E File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
E return self.do_fn_invoker.invoke_process(windowed_value)
E File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process
E self.output_handler.handle_process_outputs(
E File "apache_beam/runners/common.py", line 1683, in apache_beam.runners.common._OutputHandler.handle_process_outputs
E self._write_value_to_tag(tag, windowed_value, watermark_estimator)
E File "apache_beam/runners/common.py", line 1796, in apache_beam.runners.common._OutputHandler._write_value_to_tag
E self.main_receivers.receive(windowed_value)
E File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E self.consumer.process(windowed_value)
E File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
E with self.scoped_process_state:
E File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process
E delayed_applications = self.dofn_runner.process(o)
E File "apache_beam/runners/common.py", line 1500, in apache_beam.runners.common.DoFnRunner.process
E self._reraise_augmented(exn, windowed_value)
E File "apache_beam/runners/common.py", line 1609, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E raise new_exn
E File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
E return self.do_fn_invoker.invoke_process(windowed_value)
E File "apache_beam/runners/common.py", line 912, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E self._invoke_process_per_window(
E File "apache_beam/runners/common.py", line 1057, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E self.process_method(*args_for_process, **kwargs_for_process),
E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/transforms/core.py", line 2116, in <lambda>
E wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
E ~~^^^^^^^^^^^^^^^^^^^^
E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/testing/util.py", line 203, in _equal
E raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [(1, AnomalyResult(example=Row(x1=1, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=2.1213203435596424, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=10, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=8.0, label=1, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5773502691896252, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.16452254913212455, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)])), (2, AnomalyResult(example=Row(x1=100, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)]))] == [(1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=1, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=10, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=8.0, label=1, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5773502691896252, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)])), (2, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=100, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=2.1213203435596424, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.0, label=0, threshold=2, info='', source_predictions=None)]))], unexpected elements [(1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)]))], missing elements [(1, AnomalyResult(example=Row(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.16452254913212455, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)]))] [while running 'assert_that/Match']
Issue Failure
Failure: Test is flaky
Issue Priority
Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be sure the product is healthy)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Reactions are currently unavailable