From 43ce12f212fd0342e413f33afc2b34f3be9dd523 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 17 Apr 2026 18:35:47 +0000 Subject: [PATCH 01/16] js2py to mini-racer --- sdks/python/apache_beam/yaml/yaml_mapping.py | 151 +++++++++--------- sdks/python/apache_beam/yaml/yaml_udf_test.py | 41 ++++- sdks/python/setup.py | 3 +- 3 files changed, 108 insertions(+), 87 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index a6b2b5704751..da5726cb3f10 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -29,6 +29,10 @@ from typing import TypeVar from typing import Union +import json +import threading +import uuid + import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.portability.api import schema_pb2 @@ -53,13 +57,12 @@ from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn from apache_beam.yaml.yaml_provider import dicts_to_rows -# Import js2py package if it exists try: - import js2py - from js2py.base import JsObjectWrapper + from py_mini_racer import MiniRacer except ImportError: - js2py = None - JsObjectWrapper = object + MiniRacer = None + +_js_thread_funcs = {} _str_expression_fields = { 'AssignTimestamps': 'timestamp', @@ -178,18 +181,7 @@ def _check_mapping_arguments( raise ValueError(f'{transform_name} cannot specify "name" without "path"') -# js2py's JsObjectWrapper object has a self-referencing __dict__ property -# that cannot be pickled without implementing the __getstate__ and -# __setstate__ methods. -class _CustomJsObjectWrapper(JsObjectWrapper): - def __init__(self, js_obj): - super().__init__(js_obj.__dict__['_obj']) - - def __getstate__(self): - return self.__dict__.copy() - def __setstate__(self, state): - self.__dict__.update(state) # TODO(yaml) Improve type inferencing for JS UDF's @@ -205,83 +197,86 @@ def py_value_to_js_dict(py_value): return py_value +def js_to_py(obj): + """Converts mini-racer mapped objects to standard Python types. + + This is needed because ctx.eval returns JSMappedObjectImpl and JSArrayImpl + for JS objects and arrays, which are not picklable and would fail when Beam + tries to serialize rows containing them. We also preserve datetime objects + which are correctly produced by ctx.eval for JS Date objects. + """ + import datetime + from collections import abc + + type_name = type(obj).__name__ + if type_name == 'JSMappedObjectImpl': + return {k: js_to_py(v) for k, v in dict(obj).items()} + elif type_name == 'JSArrayImpl': + return [js_to_py(v) for v in list(obj)] + elif isinstance(obj, datetime.datetime): + return obj + elif isinstance(obj, dict): + return {k: js_to_py(v) for k, v in obj.items()} + elif not isinstance(obj, str) and isinstance(obj, abc.Iterable): + return [js_to_py(v) for v in list(obj)] + else: + return obj + + # TODO(yaml) Consider adding optional language version parameter to support # ECMAScript 5 and 6 def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): - # Check for installed js2py package - if js2py is None: + if MiniRacer is None: raise ValueError( - "Javascript mapping functions are not supported on" - " Python 3.12 or later.") - - # import remaining js2py objects - from js2py import base - from js2py.constructors import jsdate - from js2py.internals import simplex - - js_array_type = ( - base.PyJsArray, - base.PyJsArrayBuffer, - base.PyJsInt8Array, - base.PyJsUint8Array, - base.PyJsUint8ClampedArray, - base.PyJsInt16Array, - base.PyJsUint16Array, - base.PyJsInt32Array, - base.PyJsUint32Array, - base.PyJsFloat32Array, - base.PyJsFloat64Array) - - def _js_object_to_py_object(obj): - if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): - return base.to_python(obj) - elif isinstance(obj, js_array_type): - return [_js_object_to_py_object(value) for value in obj.to_list()] - elif isinstance(obj, jsdate.PyJsDate): - return obj.to_utc_dt() - elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): - return None - elif isinstance(obj, base.PyJsError): - raise RuntimeError(obj['message']) - elif isinstance(obj, base.PyJsObject): - return { - key: _js_object_to_py_object(value['value']) - for (key, value) in obj.own.items() - } - elif isinstance(obj, base.JsObjectWrapper): - return _js_object_to_py_object(obj._obj) - - return obj - - if expression: - source = '\n'.join(['function(__row__) {'] + [ - f' {name} = __row__.{name}' - for name in original_fields if name in expression - ] + [' return (' + expression + ')'] + ['}']) - js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) - - elif callable: - js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) + "JavaScript mapping functions require the 'mini-racer' package to be installed.") - else: + udf_code = None + if path: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() - js = js2py.EvalJs() - js.eval(udf_code) - js_func = _CustomJsObjectWrapper(getattr(js, name)) + elif expression: + udf_code = f"var func = (__row__) => {{ " + " ".join([ + f"const {n} = __row__.{n};" + for n in original_fields if n in expression + ]) + f" return ({expression}); }}" + elif callable: + udf_code = f"var func = {callable}" + + udf_key = str(uuid.uuid4()) def js_wrapper(row): + tid = threading.get_ident() + + global _js_thread_funcs + # MiniRacer contexts are not picklable and cannot be shared across threads. + # We use a global dict keyed by thread ID to lazily create and cache a + # context per thread. + if tid not in _js_thread_funcs: + _js_thread_funcs[tid] = {} + + if udf_key not in _js_thread_funcs[tid]: + ctx = MiniRacer() + ctx.eval(udf_code) + # We use ctx.eval instead of ctx.call to ensure that JavaScript Date + # objects are correctly returned as Python datetime objects. + # We JSON-serialize the arguments to pass them safely to eval. + if expression or callable: + _js_thread_funcs[tid][udf_key] = lambda x: ctx.eval(f"func({json.dumps(x)})") + else: + _js_thread_funcs[tid][udf_key] = lambda x: ctx.eval(f"{name}({json.dumps(x)})") + + func = _js_thread_funcs[tid][udf_key] row_as_dict = py_value_to_js_dict(row) try: - js_result = js_func(row_as_dict) - except simplex.JsException as exn: + result = func(row_as_dict) + except Exception as exn: raise RuntimeError( - f"Error evaluating javascript expression: " - f"{exn.mes['message']}") from exn - return dicts_to_rows(_js_object_to_py_object(js_result)) + f"Error evaluating JavaScript expression: {exn}") from exn + result = js_to_py(result) + return dicts_to_rows(result) return js_wrapper diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 3d664ab9de41..3b52ef13184c 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -32,10 +32,10 @@ from apache_beam.yaml.yaml_transform import YamlTransform try: - import js2py + from py_mini_racer import MiniRacer except ImportError: - js2py = None - logging.warning('js2py is not installed; some tests will be skipped.') + MiniRacer = None + logging.warning('py_mini_racer is not installed; some tests will be skipped.') def as_rows(): @@ -63,7 +63,7 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_map_to_fields_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -197,7 +197,7 @@ def test_map_to_fields_sql_reserved_keyword_append(): beam.Row(label='389a', timestamp=2, label_copy="389a"), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -252,7 +252,7 @@ def test_filter_inline_py(self): row=beam.Row(rank=2, values=[7, 8, 9])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_filter_expression_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -296,7 +296,7 @@ def test_filter_expression_py(self): row=beam.Row(rank=0, values=[1, 2, 3])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_filter_inline_js_file(self): data = ''' function f(x) { @@ -373,6 +373,33 @@ def g(x): conductor=389, row=beam.Row(rank=2, values=[7, 8, 9])), ])) + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') + def test_map_to_fields_js_date(self): + import datetime + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle', + yaml_experimental_features=['javascript'])) as p: + elements = p | beam.Create([beam.Row(label='11a')]) + result = elements | YamlTransform( + ''' + type: MapToFields + config: + language: javascript + fields: + date: + callable: | + function get_date(x) { + return new Date('2026-04-17T18:00:00Z') + } + ''') + + expected_date = datetime.datetime(2026, 4, 17, 18, 0, 0, tzinfo=datetime.timezone.utc) + + assert_that( + result | as_rows(), + equal_to([ + beam.Row(date=expected_date), + ])) if __name__ == '__main__': diff --git a/sdks/python/setup.py b/sdks/python/setup.py index b3fb98d8b0ef..28f4bca09492 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -636,8 +636,7 @@ def get_portability_package_data(): 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', - # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'js2py>=0.74,<1; python_version<"3.12"', + 'mini-racer', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against From 7d2fcb99a93b669d708f666563692dbd83c7b106 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 17 Apr 2026 19:15:27 +0000 Subject: [PATCH 02/16] fix gemini comments --- sdks/python/apache_beam/yaml/yaml_mapping.py | 88 +++++++++---------- sdks/python/apache_beam/yaml/yaml_udf_test.py | 14 +-- 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index da5726cb3f10..9ca075c88334 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -16,8 +16,13 @@ # """This module defines the basic MapToFields operation.""" + +import datetime import itertools +import json import re +import threading +import uuid from collections import abc from collections.abc import Callable from collections.abc import Collection @@ -29,10 +34,6 @@ from typing import TypeVar from typing import Union -import json -import threading -import uuid - import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.portability.api import schema_pb2 @@ -62,7 +63,24 @@ except ImportError: MiniRacer = None -_js_thread_funcs = {} + +class _JsThreadContext: + def __init__(self): + self._local = threading.local() + + def get_funcs(self): + if not hasattr(self._local, 'funcs'): + self._local.funcs = {} + return self._local.funcs + + def __getstate__(self): + return {} + + def __setstate__(self, state): + self._local = threading.local() + + +_js_contexts = _JsThreadContext() _str_expression_fields = { 'AssignTimestamps': 'timestamp', @@ -181,9 +199,6 @@ def _check_mapping_arguments( raise ValueError(f'{transform_name} cannot specify "name" without "path"') - - - # TODO(yaml) Improve type inferencing for JS UDF's def py_value_to_js_dict(py_value): if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or @@ -200,25 +215,18 @@ def py_value_to_js_dict(py_value): def js_to_py(obj): """Converts mini-racer mapped objects to standard Python types. - This is needed because ctx.eval returns JSMappedObjectImpl and JSArrayImpl - for JS objects and arrays, which are not picklable and would fail when Beam - tries to serialize rows containing them. We also preserve datetime objects - which are correctly produced by ctx.eval for JS Date objects. + This is needed because ctx.eval returns objects that implement Mapping + and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl), + which would fail when Beam tries to serialize rows containing them. + We also preserve datetime objects which are correctly produced by ctx.eval + for JS Date objects. """ - import datetime - from collections import abc - - type_name = type(obj).__name__ - if type_name == 'JSMappedObjectImpl': - return {k: js_to_py(v) for k, v in dict(obj).items()} - elif type_name == 'JSArrayImpl': - return [js_to_py(v) for v in list(obj)] - elif isinstance(obj, datetime.datetime): + if isinstance(obj, datetime.datetime): return obj - elif isinstance(obj, dict): + elif isinstance(obj, Mapping): return {k: js_to_py(v) for k, v in obj.items()} - elif not isinstance(obj, str) and isinstance(obj, abc.Iterable): - return [js_to_py(v) for v in list(obj)] + elif not isinstance(obj, str) and isinstance(obj, Iterable): + return [js_to_py(v) for v in obj] else: return obj @@ -230,7 +238,8 @@ def _expand_javascript_mapping_func( if MiniRacer is None: raise ValueError( - "JavaScript mapping functions require the 'mini-racer' package to be installed.") + "JavaScript mapping functions require the 'mini-racer' package to be installed." + ) udf_code = None if path: @@ -239,8 +248,7 @@ def _expand_javascript_mapping_func( udf_code = FileSystems.open(path).read().decode() elif expression: udf_code = f"var func = (__row__) => {{ " + " ".join([ - f"const {n} = __row__.{n};" - for n in original_fields if n in expression + f"const {n} = __row__.{n};" for n in original_fields if n in expression ]) + f" return ({expression}); }}" elif callable: udf_code = f"var func = {callable}" @@ -248,27 +256,19 @@ def _expand_javascript_mapping_func( udf_key = str(uuid.uuid4()) def js_wrapper(row): - tid = threading.get_ident() - - global _js_thread_funcs - # MiniRacer contexts are not picklable and cannot be shared across threads. - # We use a global dict keyed by thread ID to lazily create and cache a - # context per thread. - if tid not in _js_thread_funcs: - _js_thread_funcs[tid] = {} - - if udf_key not in _js_thread_funcs[tid]: + funcs = _js_contexts.get_funcs() + + if udf_key not in funcs: ctx = MiniRacer() ctx.eval(udf_code) - # We use ctx.eval instead of ctx.call to ensure that JavaScript Date - # objects are correctly returned as Python datetime objects. - # We JSON-serialize the arguments to pass them safely to eval. + # We use ctx.call for efficiency. + # Note: This might return strings for Date objects instead of datetime. if expression or callable: - _js_thread_funcs[tid][udf_key] = lambda x: ctx.eval(f"func({json.dumps(x)})") + funcs[udf_key] = lambda x: ctx.call("func", x) else: - _js_thread_funcs[tid][udf_key] = lambda x: ctx.eval(f"{name}({json.dumps(x)})") - - func = _js_thread_funcs[tid][udf_key] + funcs[udf_key] = lambda x: ctx.call(name, x) + + func = funcs[udf_key] row_as_dict = py_value_to_js_dict(row) try: result = func(row_as_dict) diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 3b52ef13184c..b90789262e3a 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -373,12 +373,13 @@ def g(x): conductor=389, row=beam.Row(rank=2, values=[7, 8, 9])), ])) + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_map_to_fields_js_date(self): import datetime with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( - pickle_library='cloudpickle', - yaml_experimental_features=['javascript'])) as p: + pickle_library='cloudpickle', yaml_experimental_features=['javascript' + ])) as p: elements = p | beam.Create([beam.Row(label='11a')]) result = elements | YamlTransform( ''' @@ -392,12 +393,11 @@ def test_map_to_fields_js_date(self): return new Date('2026-04-17T18:00:00Z') } ''') - - expected_date = datetime.datetime(2026, 4, 17, 18, 0, 0, tzinfo=datetime.timezone.utc) - + + expected_date = '2026-04-17T18:00:00.000Z' + assert_that( - result | as_rows(), - equal_to([ + result | as_rows(), equal_to([ beam.Row(date=expected_date), ])) From 8b5d5e6bde89b24f8f98b071e63c85811fe8bdfb Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 17 Apr 2026 20:13:03 +0000 Subject: [PATCH 03/16] minor tweaks for datetime and gemini comments --- sdks/python/apache_beam/yaml/yaml_mapping.py | 8 ++++++-- sdks/python/apache_beam/yaml/yaml_udf_test.py | 8 +++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 9ca075c88334..8fa7e1a011e1 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -19,7 +19,6 @@ import datetime import itertools -import json import re import threading import uuid @@ -225,8 +224,13 @@ def js_to_py(obj): return obj elif isinstance(obj, Mapping): return {k: js_to_py(v) for k, v in obj.items()} - elif not isinstance(obj, str) and isinstance(obj, Iterable): + elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable): return [js_to_py(v) for v in obj] + elif isinstance(obj, str): + try: + return datetime.datetime.fromisoformat(obj) + except ValueError: + return obj else: return obj diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index b90789262e3a..33d62fc7d80b 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -394,12 +394,10 @@ def test_map_to_fields_js_date(self): } ''') - expected_date = '2026-04-17T18:00:00.000Z' + expected_date = datetime.datetime( + 2026, 4, 17, 18, tzinfo=datetime.timezone.utc) - assert_that( - result | as_rows(), equal_to([ - beam.Row(date=expected_date), - ])) + assert_that(result | as_rows(), equal_to([beam.Row(date=expected_date)])) if __name__ == '__main__': From 58a58cdd97bca408986a92b655bc4bd2d1c4ce45 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 20 Apr 2026 19:40:36 +0000 Subject: [PATCH 04/16] try to fix test failure --- sdks/python/apache_beam/yaml/yaml_mapping.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 8fa7e1a011e1..abc063d06ccd 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -228,6 +228,8 @@ def js_to_py(obj): return [js_to_py(v) for v in obj] elif isinstance(obj, str): try: + if obj.endswith('Z'): + return datetime.datetime.fromisoformat(obj[:-1] + '+00:00') return datetime.datetime.fromisoformat(obj) except ValueError: return obj From 2b47fe9af2a6001159c1e6bbcd4036c22d526b97 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 21 Apr 2026 16:11:06 +0000 Subject: [PATCH 05/16] start post yaml check --- .github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json index 541dc4ea8e87..8ed972c9f579 100644 --- a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 2 + "revision": 3 } From c9dedd59ac9ecef90ac94d4b82f4033067754cf6 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 21 Apr 2026 18:43:55 +0000 Subject: [PATCH 06/16] add datetime regex per gemini comment --- sdks/python/apache_beam/yaml/yaml_mapping.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index abc063d06ccd..5dbcd5476ed6 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -81,6 +81,9 @@ def __setstate__(self, state): _js_contexts = _JsThreadContext() +_JS_DATE_ISO_REGEX = re.compile( + r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$') + _str_expression_fields = { 'AssignTimestamps': 'timestamp', 'Filter': 'keep', @@ -227,12 +230,12 @@ def js_to_py(obj): elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable): return [js_to_py(v) for v in obj] elif isinstance(obj, str): - try: - if obj.endswith('Z'): + if _JS_DATE_ISO_REGEX.match(obj): + try: return datetime.datetime.fromisoformat(obj[:-1] + '+00:00') - return datetime.datetime.fromisoformat(obj) - except ValueError: - return obj + except ValueError: + return obj + return obj else: return obj From de51d69f0b86a7d00fc7fae933fd591d34148c35 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 22 Apr 2026 20:39:50 +0000 Subject: [PATCH 07/16] change design to handle instances inside dofn --- sdks/python/apache_beam/yaml/yaml_mapping.py | 166 ++++++++++++------- 1 file changed, 106 insertions(+), 60 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 5dbcd5476ed6..95b3d10d577d 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -62,25 +62,6 @@ except ImportError: MiniRacer = None - -class _JsThreadContext: - def __init__(self): - self._local = threading.local() - - def get_funcs(self): - if not hasattr(self._local, 'funcs'): - self._local.funcs = {} - return self._local.funcs - - def __getstate__(self): - return {} - - def __setstate__(self, state): - self._local = threading.local() - - -_js_contexts = _JsThreadContext() - _JS_DATE_ISO_REGEX = re.compile( r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$') @@ -240,10 +221,90 @@ def js_to_py(obj): return obj +class JsFilterDoFn(beam.DoFn): + def __init__(self, udf_code, function_name): + self.udf_code = udf_code + self.function_name = function_name + self.ctx = None + + def setup(self): + self.ctx = MiniRacer() + self.ctx.eval(self.udf_code) + + def process(self, element): + row_as_dict = py_value_to_js_dict(element) + result = self.ctx.call(self.function_name, row_as_dict) + result = js_to_py(result) + if result: + yield element + + +class JsMapToFieldsDoFn(beam.DoFn): + def __init__(self, fields, original_fields, input_schema): + self.fields = fields + self.original_fields = original_fields + self.input_schema = input_schema + self.ctx = None + self.field_funcs = {} + self.passthrough_fields = [] + + def setup(self): + self.ctx = MiniRacer() + script = [] + for name, expr in self.fields.items(): + if isinstance(expr, str) and expr in self.input_schema: + self.passthrough_fields.append((name, expr)) + continue + + if isinstance(expr, str): + expr = {'expression': expr} + + if 'expression' in expr: + e = expr['expression'] + code = f"var func_{name} = (__row__) => {{ " + " ".join([ + f"const {n} = __row__.{n};" for n in self.original_fields if n in e + ]) + f" return ({e}); }}" + script.append(code) + self.field_funcs[name] = f"func_{name}" + elif 'callable' in expr: + code = f"var func_{name} = {expr['callable']}" + script.append(code) + self.field_funcs[name] = f"func_{name}" + elif 'path' in expr and 'name' in expr: + path = expr['path'] + func_name = expr['name'] + udf_code = FileSystems.open(path).read().decode() + script.append(udf_code) + self.field_funcs[name] = func_name + + if script: + self.ctx.eval("\n".join(script)) + + def process(self, element): + row_as_dict = py_value_to_js_dict(element) + result_dict = {} + + # Handle passthrough fields + for name, src in self.passthrough_fields: + result_dict[name] = row_as_dict.get(src) + + # Handle JS fields + for name, func_name in self.field_funcs.items(): + res = self.ctx.call(func_name, row_as_dict) + result_dict[name] = js_to_py(res) + + yield dicts_to_rows(result_dict) + + # TODO(yaml) Consider adding optional language version parameter to support # ECMAScript 5 and 6 -def _expand_javascript_mapping_func( - original_fields, expression=None, callable=None, path=None, name=None): +def _get_javascript_udf_code( + original_fields, + function_name="func", + expression=None, + callable=None, + path=None, + name=None): if MiniRacer is None: raise ValueError( @@ -255,39 +316,17 @@ def _expand_javascript_mapping_func( if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() + return udf_code, name elif expression: - udf_code = f"var func = (__row__) => {{ " + " ".join([ + udf_code = f"var {function_name} = (__row__) => {{ " + " ".join([ f"const {n} = __row__.{n};" for n in original_fields if n in expression ]) + f" return ({expression}); }}" + return udf_code, function_name elif callable: - udf_code = f"var func = {callable}" - - udf_key = str(uuid.uuid4()) - - def js_wrapper(row): - funcs = _js_contexts.get_funcs() - - if udf_key not in funcs: - ctx = MiniRacer() - ctx.eval(udf_code) - # We use ctx.call for efficiency. - # Note: This might return strings for Date objects instead of datetime. - if expression or callable: - funcs[udf_key] = lambda x: ctx.call("func", x) - else: - funcs[udf_key] = lambda x: ctx.call(name, x) - - func = funcs[udf_key] - row_as_dict = py_value_to_js_dict(row) - try: - result = func(row_as_dict) - except Exception as exn: - raise RuntimeError( - f"Error evaluating JavaScript expression: {exn}") from exn - result = js_to_py(result) - return dicts_to_rows(result) - - return js_wrapper + udf_code = f"var {function_name} = {callable}" + return udf_code, function_name + else: + raise ValueError("Must specify expression, callable, or path.") def _expand_python_mapping_func( @@ -394,14 +433,10 @@ def _as_callable(original_fields, expr, transform_name, language, input_schema): explicit_type = expr.pop('output_type', None) _check_mapping_arguments(transform_name, **expr) - if language == "javascript": - func = _expand_javascript_mapping_func(original_fields, **expr) - elif language in ("python", "generic", None): + if language in ("python", "generic", None): func = _expand_python_mapping_func(original_fields, **expr) else: - raise ValueError( - f'Unknown language for mapping transform: {language}. ' - 'Supported languages are "javascript" and "python."') + raise ValueError(f'Language {language} not supported in this context.') if explicit_type: if isinstance(explicit_type, str): @@ -640,8 +675,17 @@ def _PyJsFilter( error_handling: Whether and where to output records that throw errors when the above expressions are evaluated. """ # pylint: disable=line-too-long - keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic') - return pcoll | beam.Filter(keep_fn) + if language == 'javascript': + if isinstance(keep, str): + keep = {'expression': keep} + udf_code, function_name = _get_javascript_udf_code( + [f.name for f in schema_from_element_type(pcoll.element_type).fields], + **keep + ) + return pcoll | beam.ParDo(JsFilterDoFn(udf_code, function_name)) + else: + keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic') + return pcoll | beam.Filter(keep_fn) def is_expr(v): @@ -713,10 +757,12 @@ def _PyJsMapToFields( """ # pylint: disable=line-too-long input_schema, fields = normalize_fields( pcoll, fields, drop or (), append, language=language or 'generic') + original_fields = list(input_schema.keys()) + if language == 'javascript': options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript') - - original_fields = list(input_schema.keys()) + return pcoll | beam.ParDo( + JsMapToFieldsDoFn(fields, original_fields, input_schema)) return pcoll | beam.Select( **{ From 1a6b8177ee38f1114144bcaee80acd8377ae7cff Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 30 Apr 2026 14:32:13 +0000 Subject: [PATCH 08/16] address gemini comments --- sdks/python/apache_beam/yaml/yaml_mapping.py | 27 +++++++++----------- sdks/python/setup.py | 2 +- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 95b3d10d577d..d3a6c9c274ee 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -20,8 +20,6 @@ import datetime import itertools import re -import threading -import uuid from collections import abc from collections.abc import Callable from collections.abc import Collection @@ -241,18 +239,13 @@ def process(self, element): class JsMapToFieldsDoFn(beam.DoFn): def __init__(self, fields, original_fields, input_schema): - self.fields = fields - self.original_fields = original_fields - self.input_schema = input_schema self.ctx = None self.field_funcs = {} self.passthrough_fields = [] - def setup(self): - self.ctx = MiniRacer() script = [] - for name, expr in self.fields.items(): - if isinstance(expr, str) and expr in self.input_schema: + for name, expr in fields.items(): + if isinstance(expr, str) and expr in input_schema: self.passthrough_fields.append((name, expr)) continue @@ -261,9 +254,9 @@ def setup(self): if 'expression' in expr: e = expr['expression'] - code = f"var func_{name} = (__row__) => {{ " + " ".join([ - f"const {n} = __row__.{n};" for n in self.original_fields if n in e - ]) + f" return ({e}); }}" + code = f"var func_{name} = (__row__) => {{ " + " ".join( + [f"const {n} = __row__.{n};" + for n in original_fields if n in e]) + f" return ({e}); }}" script.append(code) self.field_funcs[name] = f"func_{name}" elif 'callable' in expr: @@ -277,8 +270,12 @@ def setup(self): script.append(udf_code) self.field_funcs[name] = func_name - if script: - self.ctx.eval("\n".join(script)) + self.script = "\n".join(script) if script else None + + def setup(self): + self.ctx = MiniRacer() + if self.script: + self.ctx.eval(self.script) def process(self, element): row_as_dict = py_value_to_js_dict(element) @@ -308,7 +305,7 @@ def _get_javascript_udf_code( if MiniRacer is None: raise ValueError( - "JavaScript mapping functions require the 'mini-racer' package to be installed." + "JavaScript mapping functions require the 'py-mini-racer' package to be installed." ) udf_code = None diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 28f4bca09492..7ef31fdfc483 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -636,7 +636,7 @@ def get_portability_package_data(): 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', - 'mini-racer', + 'py-mini-racer', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against From cb752057446675ded1d3b6bbc452c3c28d5d4c4d Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 30 Apr 2026 18:27:24 +0000 Subject: [PATCH 09/16] update design --- sdks/python/apache_beam/yaml/yaml_mapping.py | 19 +++++++++++++------ sdks/python/setup.py | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index d3a6c9c274ee..ba9dacbe5e0f 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -20,7 +20,6 @@ import datetime import itertools import re -from collections import abc from collections.abc import Callable from collections.abc import Collection from collections.abc import Iterable @@ -187,7 +186,7 @@ def py_value_to_js_dict(py_value): py_value = py_value._asdict() if isinstance(py_value, dict): return {key: py_value_to_js_dict(value) for key, value in py_value.items()} - elif not isinstance(py_value, str) and isinstance(py_value, abc.Iterable): + elif not isinstance(py_value, str) and isinstance(py_value, Iterable): return [py_value_to_js_dict(value) for value in list(py_value)] else: return py_value @@ -270,6 +269,14 @@ def __init__(self, fields, original_fields, input_schema): script.append(udf_code) self.field_funcs[name] = func_name + if self.field_funcs: + aggregator_entries = ", ".join([ + f'"{name}": {func_name}(__row__)' + for name, func_name in self.field_funcs.items() + ]) + script.append( + f"var __aggregate_fn__ = (__row__) => ({{ {aggregator_entries} }});") + self.script = "\n".join(script) if script else None def setup(self): @@ -285,10 +292,10 @@ def process(self, element): for name, src in self.passthrough_fields: result_dict[name] = row_as_dict.get(src) - # Handle JS fields - for name, func_name in self.field_funcs.items(): - res = self.ctx.call(func_name, row_as_dict) - result_dict[name] = js_to_py(res) + # Handle JS fields via single aggregate call + if self.field_funcs: + res = self.ctx.call("__aggregate_fn__", row_as_dict) + result_dict.update(js_to_py(res)) yield dicts_to_rows(result_dict) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 7ef31fdfc483..70bb25fb6c57 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -636,7 +636,7 @@ def get_portability_package_data(): 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', - 'py-mini-racer', + 'mini-racer>=0.14.1', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against From a5034d0985805423c5b61deb012bfd6c21a30071 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 30 Apr 2026 18:34:51 +0000 Subject: [PATCH 10/16] remove uncessary comment --- sdks/python/apache_beam/yaml/yaml_mapping.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index ba9dacbe5e0f..4512c96cc39b 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -300,8 +300,6 @@ def process(self, element): yield dicts_to_rows(result_dict) -# TODO(yaml) Consider adding optional language version parameter to support -# ECMAScript 5 and 6 def _get_javascript_udf_code( original_fields, function_name="func", From ea16899e1b95264356ab743d218604ec04ea2173 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 30 Apr 2026 18:36:28 +0000 Subject: [PATCH 11/16] move import statement --- sdks/python/apache_beam/yaml/yaml_udf_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 33d62fc7d80b..889900eac4f4 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime import logging import os import shutil @@ -376,7 +377,6 @@ def g(x): @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_map_to_fields_js_date(self): - import datetime with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' ])) as p: From a8b58c2f7e7b222abaa8f24fbe966cb4e1b10e20 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 1 May 2026 19:11:22 +0000 Subject: [PATCH 12/16] address gemini comment --- sdks/python/apache_beam/yaml/yaml_mapping.py | 21 ++++++++---- sdks/python/apache_beam/yaml/yaml_udf_test.py | 32 ++++++++++++++++++- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 4512c96cc39b..238c697dd36e 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -243,7 +243,7 @@ def __init__(self, fields, original_fields, input_schema): self.passthrough_fields = [] script = [] - for name, expr in fields.items(): + for i, (name, expr) in enumerate(fields.items()): if isinstance(expr, str) and expr in input_schema: self.passthrough_fields.append((name, expr)) continue @@ -251,17 +251,21 @@ def __init__(self, fields, original_fields, input_schema): if isinstance(expr, str): expr = {'expression': expr} + # We use numeric indexing (func_{i}) instead of reusing the output field + # name to prevent syntax errors if output names contain spaces or hyphens. + # We also use bracket notation for robustness against input field names + # that aren't compliant dot-access identifiers. if 'expression' in expr: e = expr['expression'] - code = f"var func_{name} = (__row__) => {{ " + " ".join( - [f"const {n} = __row__.{n};" + code = f"var func_{i} = (__row__) => {{ " + " ".join( + [f"const {n} = __row__['{n}'];" for n in original_fields if n in e]) + f" return ({e}); }}" script.append(code) - self.field_funcs[name] = f"func_{name}" + self.field_funcs[name] = f"func_{i}" elif 'callable' in expr: - code = f"var func_{name} = {expr['callable']}" + code = f"var func_{i} = {expr['callable']}" script.append(code) - self.field_funcs[name] = f"func_{name}" + self.field_funcs[name] = f"func_{i}" elif 'path' in expr and 'name' in expr: path = expr['path'] func_name = expr['name'] @@ -320,8 +324,11 @@ def _get_javascript_udf_code( udf_code = FileSystems.open(path).read().decode() return udf_code, name elif expression: + # We use bracket notation for robustness against field names that + # aren't compliant dot-access identifiers. udf_code = f"var {function_name} = (__row__) => {{ " + " ".join([ - f"const {n} = __row__.{n};" for n in original_fields if n in expression + f"const {n} = __row__['{n}'];" + for n in original_fields if n in expression ]) + f" return ({expression}); }}" return udf_code, function_name elif callable: diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 889900eac4f4..ef525032df6d 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -397,7 +397,37 @@ def test_map_to_fields_js_date(self): expected_date = datetime.datetime( 2026, 4, 17, 18, tzinfo=datetime.timezone.utc) - assert_that(result | as_rows(), equal_to([beam.Row(date=expected_date)])) + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') + def test_map_to_fields_js_special_names(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle', yaml_experimental_features=['javascript' + ])) as p: + elements = p | beam.Create([beam.Row(label='test')]) + result = elements | YamlTransform( + ''' + type: MapToFields + config: + language: javascript + fields: + 'weird output-name': + expression: "label + '-ok'" + ''') + + # Verify that it yields a single row with the transformed output. + # We use as_dict here because comparing typed Row with spaces can fail + # downstream assertions if mapped back to a python tuple. + def row_to_dict(r): + return { + k: getattr(r, k) + for k in dir(r) + if not k.startswith('_') and not callable(getattr(r, k)) + } + + assert_that( + result | beam.Map(lambda r: dict(r._asdict())), + equal_to([{ + 'weird output-name': 'test-ok' + }])) if __name__ == '__main__': From b34f344b162f1b849a9041c3a706dadced21b8f9 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 4 May 2026 14:52:53 +0000 Subject: [PATCH 13/16] address more gemini comments --- sdks/python/apache_beam/yaml/yaml_mapping.py | 11 ++++++++--- sdks/python/apache_beam/yaml/yaml_udf_test.py | 3 +++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 238c697dd36e..5d79b7cefcf7 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -257,9 +257,14 @@ def __init__(self, fields, original_fields, input_schema): # that aren't compliant dot-access identifiers. if 'expression' in expr: e = expr['expression'] - code = f"var func_{i} = (__row__) => {{ " + " ".join( - [f"const {n} = __row__['{n}'];" - for n in original_fields if n in e]) + f" return ({e}); }}" + js_identifier_pattern = re.compile(r'^[a-zA-Z_$][a-zA-Z0-9_$]*$') + valid_fields = [ + n for n in original_fields + if n in e and js_identifier_pattern.match(n) + ] + consts = " ".join( + [f"const {n} = __row__['{n}'];" for n in valid_fields]) + code = f"var func_{i} = (__row__) => {{ {consts} return ({e}); }}" script.append(code) self.field_funcs[name] = f"func_{i}" elif 'callable' in expr: diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index ef525032df6d..b339f2436810 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -396,6 +396,9 @@ def test_map_to_fields_js_date(self): expected_date = datetime.datetime( 2026, 4, 17, 18, tzinfo=datetime.timezone.utc) + assert_that(result, equal_to([ + beam.Row(date=expected_date), + ])) @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_map_to_fields_js_special_names(self): From 1f3c2762e09ddea4c6dc4d789219c1bd200600b8 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 5 May 2026 16:04:28 +0000 Subject: [PATCH 14/16] address a few minor gemini concerns --- sdks/python/apache_beam/yaml/yaml_mapping.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 5d79b7cefcf7..887d0159377b 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -61,6 +61,7 @@ _JS_DATE_ISO_REGEX = re.compile( r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$') +_JS_IDENTIFIER_PATTERN = re.compile(r'^[a-zA-Z_$][a-zA-Z0-9_$]*$') _str_expression_fields = { 'AssignTimestamps': 'timestamp', @@ -257,10 +258,9 @@ def __init__(self, fields, original_fields, input_schema): # that aren't compliant dot-access identifiers. if 'expression' in expr: e = expr['expression'] - js_identifier_pattern = re.compile(r'^[a-zA-Z_$][a-zA-Z0-9_$]*$') valid_fields = [ n for n in original_fields - if n in e and js_identifier_pattern.match(n) + if n in e and _JS_IDENTIFIER_PATTERN.match(n) ] consts = " ".join( [f"const {n} = __row__['{n}'];" for n in valid_fields]) @@ -775,6 +775,10 @@ def _PyJsMapToFields( if language == 'javascript': options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript') + if MiniRacer is None: + raise ValueError( + "JavaScript mapping functions require the 'py-mini-racer' package to be installed." + ) return pcoll | beam.ParDo( JsMapToFieldsDoFn(fields, original_fields, input_schema)) From 8db673739b05d41a757f76472dd3760a7dd12608 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 6 May 2026 13:15:46 +0000 Subject: [PATCH 15/16] use jinja template to help simplify the code context --- sdks/python/apache_beam/yaml/yaml_mapping.py | 35 ++++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 887d0159377b..8b6ec6969b2d 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -30,6 +30,7 @@ from typing import TypeVar from typing import Union +import jinja2 import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.portability.api import schema_pb2 @@ -69,6 +70,25 @@ 'Partition': 'by', } +JS_EXPR_TEMPLATE = jinja2.Template( + """ +var {{ func_id }} = (__row__) => { + {% for field in valid_fields %} + const {{ field }} = __row__['{{ field }}']; + {% endfor %} + return ({{ expr }}); +}; +""") + +JS_AGGREGATOR_TEMPLATE = jinja2.Template( + """ +var __aggregate_fn__ = (__row__) => ({ + {% for name, func_name in field_funcs.items() %} + "{{ name }}": {{ func_name }}(__row__){% if not loop.last %},{% endif %} + {% endfor %} +}); +""") + def normalize_mapping(spec): """ @@ -262,10 +282,9 @@ def __init__(self, fields, original_fields, input_schema): n for n in original_fields if n in e and _JS_IDENTIFIER_PATTERN.match(n) ] - consts = " ".join( - [f"const {n} = __row__['{n}'];" for n in valid_fields]) - code = f"var func_{i} = (__row__) => {{ {consts} return ({e}); }}" - script.append(code) + code = JS_EXPR_TEMPLATE.render( + func_id=f"func_{i}", valid_fields=valid_fields, expr=e) + script.append(code.strip()) self.field_funcs[name] = f"func_{i}" elif 'callable' in expr: code = f"var func_{i} = {expr['callable']}" @@ -279,12 +298,8 @@ def __init__(self, fields, original_fields, input_schema): self.field_funcs[name] = func_name if self.field_funcs: - aggregator_entries = ", ".join([ - f'"{name}": {func_name}(__row__)' - for name, func_name in self.field_funcs.items() - ]) - script.append( - f"var __aggregate_fn__ = (__row__) => ({{ {aggregator_entries} }});") + code = JS_AGGREGATOR_TEMPLATE.render(field_funcs=self.field_funcs) + script.append(code.strip()) self.script = "\n".join(script) if script else None From 0b85e285114bccc08ba849d164534286af3d061a Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 6 May 2026 14:01:04 +0000 Subject: [PATCH 16/16] address lint issue --- sdks/python/apache_beam/yaml/yaml_mapping.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 8b6ec6969b2d..30cf91923da2 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -31,6 +31,7 @@ from typing import Union import jinja2 + import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.portability.api import schema_pb2