From 8339236320b671a61e97c2918c9d3aa288867132 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 13 Apr 2026 21:39:35 -0400 Subject: [PATCH 1/3] Add ExceptionInfo type for with_exception_handling --- sdks/python/apache_beam/transforms/core.py | 26 ++++++++++++++----- .../apache_beam/transforms/core_test.py | 17 ++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index b5c3178210d9..35fa0e19ebd8 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2312,6 +2312,24 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name return pardo +# Element format emitted on the dead-letter tag by `with_exception_handling()`: +# (exception_class, repr(exception), formatted_traceback_lines). +# The first slot is the bare metaclass `type` rather than `Type[BaseException]` +# because the runtime value is `type(exn)`, a class object whose only +# universally-correct hint is `type`. Beam has no parametric `Type[...]` +# constraint that would yield a non-pickle coder anyway, so narrowing here +# would over-promise without changing the wire format. +ExceptionInfo = typehints.Tuple[type, str, typehints.List[str]] + + +class DeadLetter: + """Type hint for a dead-letter element: (original_element, ExceptionInfo). + + Use as ``DeadLetter[T]`` in ``with_output_types(...)`` etc.""" + def __class_getitem__(cls, element_type): + return typehints.Tuple[element_type, ExceptionInfo] + + class _ExceptionHandlingWrapper(ptransform.PTransform): """Implementation of ParDo.with_exception_handling.""" def __init__( @@ -2455,13 +2473,7 @@ def expand(self, pcoll): main_output_type = self._fn.infer_output_type(pcoll.element_type) tagged_type_hints = dict(self._fn.get_type_hints().tagged_output_types()) - # Dead letter format: Tuple[element, Tuple[exception_type, repr, traceback]] - dead_letter_type = typehints.Tuple[pcoll.element_type, - typehints.Tuple[type, - str, - typehints.List[str]]] - - tagged_type_hints[self._dead_letter_tag] = dead_letter_type + tagged_type_hints[self._dead_letter_tag] = DeadLetter[pcoll.element_type] pardo = pardo.with_output_types(main_output_type, **tagged_type_hints) all_tags = tuple(set(self._extra_tags or ()) | {self._dead_letter_tag}) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index d80a03bdf53b..a1f50d3b1423 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -31,6 +31,7 @@ import apache_beam as beam from apache_beam.coders import coders from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms.core import DeadLetter from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms.resources import ResourceHint @@ -559,9 +560,7 @@ def test_with_exception_handling_then_with_outputs(self): self.assertEqual(results.main.element_type, int) self.assertEqual(results.threes.element_type, int) self.assertEqual(results.fives.element_type, str) - self.assertEqual( - results.bad.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) + self.assertEqual(results.bad.element_type, DeadLetter[int]) def test_with_outputs_then_with_exception_handling(self): """Direction 2: .with_outputs().with_exception_handling()""" @@ -582,9 +581,7 @@ def test_with_outputs_then_with_exception_handling(self): self.assertEqual(results.main.element_type, int) self.assertEqual(results.threes.element_type, int) self.assertEqual(results.fives.element_type, str) - self.assertEqual( - results.bad.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) + self.assertEqual(results.bad.element_type, DeadLetter[int]) def test_with_outputs_then_with_exception_handling_custom_dead_letter_tag( self): @@ -603,9 +600,7 @@ def test_with_outputs_then_with_exception_handling_custom_dead_letter_tag( bad_elements = results.errors | beam.Keys() assert_that(bad_elements, equal_to([2]), 'errors') self.assertEqual(results.threes.element_type, int) - self.assertEqual( - results.errors.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) + self.assertEqual(results.errors.element_type, DeadLetter[int]) def test_with_exception_handling_then_with_outputs_custom_dead_letter_tag( self): @@ -624,9 +619,7 @@ def test_with_exception_handling_then_with_outputs_custom_dead_letter_tag( bad_elements = results.errors | beam.Keys() assert_that(bad_elements, equal_to([2]), 'errors') self.assertEqual(results.threes.element_type, int) - self.assertEqual( - results.errors.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) + self.assertEqual(results.errors.element_type, DeadLetter[int]) def test_exception_handling_no_with_outputs_backward_compat(self): """Without with_outputs(), behavior is unchanged.""" From d448f3400116122935fc79fb18cae3c7169c4321 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 14 Apr 2026 08:36:04 -0400 Subject: [PATCH 2/3] lint --- sdks/python/apache_beam/transforms/core_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index a1f50d3b1423..32594941433c 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -31,9 +31,9 @@ import apache_beam as beam from apache_beam.coders import coders from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.transforms.core import DeadLetter from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms.core import DeadLetter from apache_beam.transforms.resources import ResourceHint from apache_beam.transforms.userstate import BagStateSpec from apache_beam.transforms.userstate import ReadModifyWriteStateSpec From 2371f5768ae2f0d25d12e6ace8ae4f9bb6e0c3b4 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 15 Apr 2026 11:56:54 -0400 Subject: [PATCH 3/3] changes --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index aa9a49a16e68..bdcbd3451c7b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -73,6 +73,7 @@ encode finished bitset. SentinelBitSetCoder and BitSetCoder are state compatible. Both coders can decode encoded bytes from the other coder ([#38139](https://github.com/apache/beam/issues/38139)). +* (Python) Added type alias for with_exception_handling to be used for typehints. ([#38173](https://github.com/apache/beam/issues/38173)). ## Breaking Changes