Skip to content

[FLINK-38941][python] Add Python building blocks for async Python scalar function#27438

Closed
dianfu wants to merge 4 commits intoapache:masterfrom
dianfu:python_async_scalar_func
Closed

[FLINK-38941][python] Add Python building blocks for async Python scalar function#27438
dianfu wants to merge 4 commits intoapache:masterfrom
dianfu:python_async_scalar_func

Conversation

@dianfu
Copy link
Copy Markdown
Contributor

@dianfu dianfu commented Jan 19, 2026

What is the purpose of the change

  • This pull request adds Python building blocks for async Python scalar function*

Brief change log

  • Introduce Python AsyncScalarFunction and update the udf decorator to support async scalar function
  • Introduce AsyncScalarFunctionOperation for Python AsyncScalarFunction execution
  • Add async Python scalar function example

Verifying this change

This change added tests and can be verified as follows:

  • Added tests test_async_scalar_function.py

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented, will add in a separate PR)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jan 19, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@dianfu dianfu force-pushed the python_async_scalar_func branch 2 times, most recently from 21307ae to 13766df Compare January 19, 2026 07:55
Copy link
Copy Markdown
Contributor

@bgeng777 bgeng777 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just some minor comments

self._output_processor = None

# Job parameters
self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn'tjob_parameters field start with '_' as well


entry = self._queue.put(None, 0, 0, value)

async def execute_async(result_handler):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 149 directly uses result_handler as a variable name, I believe the codes are right but maybe renaming the input parameter here to a different name can make the codes more readable.

Comment thread flink-python/pyflink/table/udf.py Outdated
@@ -671,7 +782,7 @@ def udf(f: Union[Callable, ScalarFunction, Type] = None,
(default: general)
:param udf_type: the type of the python function, available value: general, pandas,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we do not have :param udf_type any more in the input parameters, we can remove this doc line

self.assertIn("Test exception in async eval", str(context.exception))

def test_async_function_with_retry_logic(self):
"""Test async scalar function with custom retry logic."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we enable retry for this test? I don't find we have set some retry configs here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found table.exec.async-scalar.retry-strategy's default value is FIXED_DELAY. I may get this mixed up with datastream api. Ignore the previous comment.

@dianfu dianfu force-pushed the python_async_scalar_func branch from 86bf9f5 to 4dcbc36 Compare February 3, 2026 07:07
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Feb 3, 2026
Copy link
Copy Markdown
Contributor

@bgeng777 bgeng777 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@dianfu dianfu closed this in 5f5003a Feb 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants