From 8a7ca8cc0720296f1a7f2aa24d861f0d0a8fcad0 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 24 Feb 2026 20:48:13 +0000 Subject: [PATCH 1/8] feat: Add concurrency options to udf --- bigframes/functions/_function_client.py | 12 ++ bigframes/functions/_function_session.py | 4 + bigframes/pandas/__init__.py | 4 + bigframes/session/__init__.py | 4 + .../remote_function_concurrency.ipynb | 162 ++++++++++++++++++ 5 files changed, 186 insertions(+) create mode 100644 notebooks/remote_functions/remote_function_concurrency.ipynb diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index a82217da03..78e2bff950 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -404,6 +404,8 @@ def create_cloud_function( vpc_connector_egress_settings="private-ranges-only", memory_mib=1024, ingress_settings="internal-only", + workers=None, + concurrency=None, ): """Create a cloud function from the given user defined function.""" @@ -517,6 +519,12 @@ def create_cloud_function( function.service_config.service_account_email = ( self._cloud_function_service_account ) + if concurrency: + function.service_config.max_instance_request_concurrency = concurrency + if workers: + function.service_config.environment_variables = { + "WORKERS": str(workers) + } if ingress_settings not in _INGRESS_SETTINGS_MAP: raise bf_formatting.create_exception_with_feedback_link( ValueError, @@ -583,6 +591,8 @@ def provision_bq_remote_function( cloud_function_memory_mib, cloud_function_ingress_settings, bq_metadata, + workers, + concurrency, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package @@ -631,6 +641,8 @@ def provision_bq_remote_function( vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, memory_mib=cloud_function_memory_mib, ingress_settings=cloud_function_ingress_settings, + workers=workers, + concurrency=concurrency, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index b0fc25219a..9629e9abaa 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -253,6 +253,8 @@ def remote_function( "all", "internal-only", "internal-and-gclb" ] = "internal-only", cloud_build_service_account: Optional[str] = None, + workers: Optional[int] = 0, + concurrency: Optional[int] = 0, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -640,6 +642,8 @@ def wrapper(func): cloud_function_memory_mib=cloud_function_memory_mib, cloud_function_ingress_settings=cloud_function_ingress_settings, bq_metadata=bqrf_metadata, + workers=workers, + concurrency=concurrency, ) bigframes_cloud_function = ( diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index a70e319747..e3a5783711 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -93,6 +93,8 @@ def remote_function( "all", "internal-only", "internal-and-gclb" ] = "internal-only", cloud_build_service_account: Optional[str] = None, + workers: Optional[int] = 0, + concurrency: Optional[int] = 0, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -114,6 +116,8 @@ def remote_function( cloud_function_memory_mib=cloud_function_memory_mib, cloud_function_ingress_settings=cloud_function_ingress_settings, cloud_build_service_account=cloud_build_service_account, + workers=workers, + concurrency=concurrency, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 23f4178f3d..adc9813e72 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1542,6 +1542,8 @@ def remote_function( "all", "internal-only", "internal-and-gclb" ] = "internal-only", cloud_build_service_account: Optional[str] = None, + workers: Optional[int] = 0, + concurrency: Optional[int] = 0, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1770,6 +1772,8 @@ def remote_function( cloud_function_memory_mib=cloud_function_memory_mib, cloud_function_ingress_settings=cloud_function_ingress_settings, cloud_build_service_account=cloud_build_service_account, + workers=workers, + concurrency=concurrency, ) def deploy_udf( diff --git a/notebooks/remote_functions/remote_function_concurrency.ipynb b/notebooks/remote_functions/remote_function_concurrency.ipynb new file mode 100644 index 0000000000..72541c1279 --- /dev/null +++ b/notebooks/remote_functions/remote_function_concurrency.ipynb @@ -0,0 +1,162 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 15, + "id": "2ad860c6", + "metadata": {}, + "outputs": [], + "source": [ + "import bigframes.pandas as bpd\n", + "\n", + "bpd.options.bigquery.ordering_mode = 'partial'\n" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "03a64e81", + "metadata": {}, + "outputs": [], + "source": [ + "df = bpd.read_gbq(\"bigquery-public-data.baseball.schedules\")[[\"homeTeamName\", \"awayTeamName\", \"duration_minutes\"]]\n" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "b71fd6b4", + "metadata": {}, + "outputs": [], + "source": [ + "import math\n", + "\n", + "def moderate_cpu_work(n: int) -> int:\n", + " \"\"\"\n", + " Performs a fixed amount of CPU-bound math to test UDF overhead.\n", + " \"\"\"\n", + " if n is None:\n", + " return 0\n", + " \n", + " # Using a fixed iteration count ensures the 'work' is consistent \n", + " # regardless of the input value, which helps in benchmarking.\n", + " iterations = 100_000 \n", + " aggregate = float(n)\n", + " \n", + " for i in range(iterations):\n", + " # A mix of trig and power functions to keep the CPU busy\n", + " # We use i to ensure the calculation doesn't get optimized away\n", + " aggregate += math.sqrt(abs(math.sin(i) * math.cos(aggregate)))\n", + " \n", + " # Return a valid integer (modulo-ed to keep it clean)\n", + " return int(aggregate) % 1_000_000" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "651095ed", + "metadata": {}, + "outputs": [], + "source": [ + "classic_func = bpd.remote_function(reuse=False, cloud_function_service_account=\"default\", cloud_function_memory_mib=16384)(moderate_cpu_work)\n", + "concurrent_func = bpd.remote_function(reuse=False, cloud_function_service_account=\"default\", cloud_function_memory_mib=16384, workers=33, concurrency=32)(moderate_cpu_work)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "608f3659", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/google/home/tbergeron/src/bigframes/bigframes/core/logging/log_adapter.py:183: TimeTravelCacheWarning: Reading cached table from 2026-02-24 19:15:25.384225+00:00 to avoid\n", + "incompatibilies with previous reads of this table. To read the latest\n", + "version, set `use_cache=False` or close the current session with\n", + "Session.close() or bigframes.pandas.close_session().\n", + " return method(*args, **kwargs)\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + " Query processed 62.6 kB in 27 seconds of slot time. [Job bigframes-dev:US.71a54816-1dba-4943-9537-9f8322fd4a11 details]\n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "df1 = bpd.read_gbq(\"bigquery-public-data.bitcoin_blockchain.blockss\")[[\"block_id\"]]\n", + "r1 = df1.assign(result_val=df1[\"block_id\"].apply(classic_func)).to_gbq()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "54fb7a54", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/google/home/tbergeron/src/bigframes/bigframes/core/logging/log_adapter.py:183: TimeTravelCacheWarning: Reading cached table from 2026-02-24 19:15:25.384225+00:00 to avoid\n", + "incompatibilies with previous reads of this table. To read the latest\n", + "version, set `use_cache=False` or close the current session with\n", + "Session.close() or bigframes.pandas.close_session().\n", + " return method(*args, **kwargs)\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + " Query processed 62.6 kB in 25 seconds of slot time. [Job bigframes-dev:US.cd16b23e-a47a-4c16-9cfe-0ba230a324dd details]\n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "df2 = bpd.read_gbq(\"bigquery-public-data.bitcoin_blockchain.blocks\")[[\"block_id\"]]\n", + "r2 = df2.assign(result_val=df2[\"block_id\"].apply(concurrent_func)).to_gbq()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv (3.12.6)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From e64d9eca4ba506236578943ed6bad01b09f66453 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Feb 2026 02:21:14 +0000 Subject: [PATCH 2/8] expose cpu instead of concurrency --- bigframes/functions/_function_client.py | 50 ++++++++++++++++++++++-- bigframes/functions/_function_session.py | 8 +++- bigframes/pandas/__init__.py | 8 ++-- bigframes/session/__init__.py | 8 +++- 4 files changed, 63 insertions(+), 11 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 78e2bff950..5bd989e98c 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -65,6 +65,8 @@ # BQ managed functions (@udf) currently only support Python 3.11. _MANAGED_FUNC_PYTHON_VERSION = "python-3.11" +_DEFAULT_FUNCTION_MEMORY_MIB = 1024 + class FunctionClient: # Wait time (in seconds) for an IAM binding to take effect after creation. @@ -402,9 +404,11 @@ def create_cloud_function( is_row_processor=False, vpc_connector=None, vpc_connector_egress_settings="private-ranges-only", - memory_mib=1024, + memory_mib=None, + cpus=None, ingress_settings="internal-only", workers=None, + threads=None, concurrency=None, ): """Create a cloud function from the given user defined function.""" @@ -488,6 +492,8 @@ def create_cloud_function( function.service_config = functions_v2.ServiceConfig() if memory_mib is not None: function.service_config.available_memory = f"{memory_mib}Mi" + if cpus is not None: + function.service_config.available_cpu = str(cpus) if timeout_seconds is not None: if timeout_seconds > 1200: raise bf_formatting.create_exception_with_feedback_link( @@ -521,10 +527,15 @@ def create_cloud_function( ) if concurrency: function.service_config.max_instance_request_concurrency = concurrency + + env_vars = {} if workers: - function.service_config.environment_variables = { - "WORKERS": str(workers) - } + env_vars["WORKERS"] = str(workers) + if threads: + env_vars["THREADS"] = str(threads) + if env_vars: + function.service_config.environment_variables = env_vars + if ingress_settings not in _INGRESS_SETTINGS_MAP: raise bf_formatting.create_exception_with_feedback_link( ValueError, @@ -589,6 +600,7 @@ def provision_bq_remote_function( cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib, + cloud_function_cpus, cloud_function_ingress_settings, bq_metadata, workers, @@ -626,6 +638,18 @@ def provision_bq_remote_function( ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) + if (cloud_function_cpus is None) and (cloud_function_memory_mib is None): + cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB + + # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL + # therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker + expected_cpus = cloud_function_cpus or _infer_cpus_from_memory( + cloud_function_memory_mib + ) + workers = expected_cpus + concurrency = expected_cpus + threads = 2 # (per worker) + # Create the cloud function if it does not exist if not cf_endpoint: cf_endpoint = self.create_cloud_function( @@ -640,8 +664,10 @@ def provision_bq_remote_function( vpc_connector=cloud_function_vpc_connector, vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, memory_mib=cloud_function_memory_mib, + cpus=cloud_function_cpus, ingress_settings=cloud_function_ingress_settings, workers=workers, + threads=threads, concurrency=concurrency, ) else: @@ -708,3 +734,19 @@ def get_remote_function_specs(self, remote_function_name): # Note: list_routines doesn't make an API request until we iterate on the response object. pass return (http_endpoint, bq_connection) + + +def _infer_cpus_from_memory(memory_mib: int) -> int: + if memory_mib <= 2048: + # in actuality, will be 0.583 for 1024mb, 0.33 for 512mb, etc, but we round up to 1 + return 1 + elif memory_mib <= 8192: + return 2 + elif memory_mib <= 8192: + return 2 + elif memory_mib <= 16384: + return 4 + elif memory_mib <= 32768: + return 8 + else: + raise ValueError("Cloud run support at most 32768MiB per instance") diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 9629e9abaa..58f5173165 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -248,7 +248,8 @@ def remote_function( cloud_function_vpc_connector_egress_settings: Optional[ Literal["all", "private-ranges-only", "unspecified"] ] = None, - cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_memory_mib: Optional[int] = None, + cloud_function_cpus: Optional[int] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", @@ -446,6 +447,10 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_cpus (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. + https://docs.cloud.google.com/run/docs/configuring/services/cpu. cloud_function_ingress_settings (str, Optional): Ingress settings controls dictating what traffic can reach the function. Options are: `all`, `internal-only`, or `internal-and-gclb`. @@ -640,6 +645,7 @@ def wrapper(func): cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, bq_metadata=bqrf_metadata, workers=workers, diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index e3a5783711..df562ceadd 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -88,13 +88,12 @@ def remote_function( cloud_function_vpc_connector_egress_settings: Optional[ Literal["all", "private-ranges-only", "unspecified"] ] = None, - cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_memory_mib: Optional[int] = None, + cloud_function_cpus: Optional[int] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", cloud_build_service_account: Optional[str] = None, - workers: Optional[int] = 0, - concurrency: Optional[int] = 0, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -114,10 +113,9 @@ def remote_function( cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, cloud_build_service_account=cloud_build_service_account, - workers=workers, - concurrency=concurrency, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index adc9813e72..828f8dfac1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1537,7 +1537,8 @@ def remote_function( cloud_function_vpc_connector_egress_settings: Optional[ Literal["all", "private-ranges-only", "unspecified"] ] = None, - cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_memory_mib: Optional[int] = None, + cloud_function_cpus: Optional[int] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", @@ -1720,6 +1721,10 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_cpus (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. + https://docs.cloud.google.com/run/docs/configuring/services/cpu. cloud_function_ingress_settings (str, Optional): Ingress settings controls dictating what traffic can reach the function. Options are: `all`, `internal-only`, or `internal-and-gclb`. @@ -1770,6 +1775,7 @@ def remote_function( cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, cloud_build_service_account=cloud_build_service_account, workers=workers, From efb54ac035b0efb15189ba9ac848371e5a400f42 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Feb 2026 18:09:57 +0000 Subject: [PATCH 3/8] use 4 threads per worker --- bigframes/functions/_function_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 5bd989e98c..a26923a4bf 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -647,8 +647,8 @@ def provision_bq_remote_function( cloud_function_memory_mib ) workers = expected_cpus - concurrency = expected_cpus - threads = 2 # (per worker) + threads = 4 # (per worker) + concurrency = workers * threads # Create the cloud function if it does not exist if not cf_endpoint: From cca80010c47e80c3c4601c9c1daf2bce3ef17f79 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Feb 2026 18:27:21 +0000 Subject: [PATCH 4/8] delete notebook --- .../remote_function_concurrency.ipynb | 162 ------------------ 1 file changed, 162 deletions(-) delete mode 100644 notebooks/remote_functions/remote_function_concurrency.ipynb diff --git a/notebooks/remote_functions/remote_function_concurrency.ipynb b/notebooks/remote_functions/remote_function_concurrency.ipynb deleted file mode 100644 index 72541c1279..0000000000 --- a/notebooks/remote_functions/remote_function_concurrency.ipynb +++ /dev/null @@ -1,162 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 15, - "id": "2ad860c6", - "metadata": {}, - "outputs": [], - "source": [ - "import bigframes.pandas as bpd\n", - "\n", - "bpd.options.bigquery.ordering_mode = 'partial'\n" - ] - }, - { - "cell_type": "code", - "execution_count": 16, - "id": "03a64e81", - "metadata": {}, - "outputs": [], - "source": [ - "df = bpd.read_gbq(\"bigquery-public-data.baseball.schedules\")[[\"homeTeamName\", \"awayTeamName\", \"duration_minutes\"]]\n" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "id": "b71fd6b4", - "metadata": {}, - "outputs": [], - "source": [ - "import math\n", - "\n", - "def moderate_cpu_work(n: int) -> int:\n", - " \"\"\"\n", - " Performs a fixed amount of CPU-bound math to test UDF overhead.\n", - " \"\"\"\n", - " if n is None:\n", - " return 0\n", - " \n", - " # Using a fixed iteration count ensures the 'work' is consistent \n", - " # regardless of the input value, which helps in benchmarking.\n", - " iterations = 100_000 \n", - " aggregate = float(n)\n", - " \n", - " for i in range(iterations):\n", - " # A mix of trig and power functions to keep the CPU busy\n", - " # We use i to ensure the calculation doesn't get optimized away\n", - " aggregate += math.sqrt(abs(math.sin(i) * math.cos(aggregate)))\n", - " \n", - " # Return a valid integer (modulo-ed to keep it clean)\n", - " return int(aggregate) % 1_000_000" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "651095ed", - "metadata": {}, - "outputs": [], - "source": [ - "classic_func = bpd.remote_function(reuse=False, cloud_function_service_account=\"default\", cloud_function_memory_mib=16384)(moderate_cpu_work)\n", - "concurrent_func = bpd.remote_function(reuse=False, cloud_function_service_account=\"default\", cloud_function_memory_mib=16384, workers=33, concurrency=32)(moderate_cpu_work)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "608f3659", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/usr/local/google/home/tbergeron/src/bigframes/bigframes/core/logging/log_adapter.py:183: TimeTravelCacheWarning: Reading cached table from 2026-02-24 19:15:25.384225+00:00 to avoid\n", - "incompatibilies with previous reads of this table. To read the latest\n", - "version, set `use_cache=False` or close the current session with\n", - "Session.close() or bigframes.pandas.close_session().\n", - " return method(*args, **kwargs)\n" - ] - }, - { - "data": { - "text/html": [ - "\n", - " Query processed 62.6 kB in 27 seconds of slot time. [Job bigframes-dev:US.71a54816-1dba-4943-9537-9f8322fd4a11 details]\n", - " " - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "df1 = bpd.read_gbq(\"bigquery-public-data.bitcoin_blockchain.blockss\")[[\"block_id\"]]\n", - "r1 = df1.assign(result_val=df1[\"block_id\"].apply(classic_func)).to_gbq()\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "54fb7a54", - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/usr/local/google/home/tbergeron/src/bigframes/bigframes/core/logging/log_adapter.py:183: TimeTravelCacheWarning: Reading cached table from 2026-02-24 19:15:25.384225+00:00 to avoid\n", - "incompatibilies with previous reads of this table. To read the latest\n", - "version, set `use_cache=False` or close the current session with\n", - "Session.close() or bigframes.pandas.close_session().\n", - " return method(*args, **kwargs)\n" - ] - }, - { - "data": { - "text/html": [ - "\n", - " Query processed 62.6 kB in 25 seconds of slot time. [Job bigframes-dev:US.cd16b23e-a47a-4c16-9cfe-0ba230a324dd details]\n", - " " - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "df2 = bpd.read_gbq(\"bigquery-public-data.bitcoin_blockchain.blocks\")[[\"block_id\"]]\n", - "r2 = df2.assign(result_val=df2[\"block_id\"].apply(concurrent_func)).to_gbq()\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "venv (3.12.6)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.6" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} From c8a6394bab2246046ad5f27a52d5a6fc8709ae0c Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Feb 2026 18:49:53 +0000 Subject: [PATCH 5/8] remove dead params --- bigframes/functions/_function_client.py | 2 -- bigframes/functions/_function_session.py | 4 ---- bigframes/session/__init__.py | 4 ---- 3 files changed, 10 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index a26923a4bf..44726d1757 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -603,8 +603,6 @@ def provision_bq_remote_function( cloud_function_cpus, cloud_function_ingress_settings, bq_metadata, - workers, - concurrency, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 58f5173165..32c3d14eb7 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -254,8 +254,6 @@ def remote_function( "all", "internal-only", "internal-and-gclb" ] = "internal-only", cloud_build_service_account: Optional[str] = None, - workers: Optional[int] = 0, - concurrency: Optional[int] = 0, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -648,8 +646,6 @@ def wrapper(func): cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, bq_metadata=bqrf_metadata, - workers=workers, - concurrency=concurrency, ) bigframes_cloud_function = ( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 828f8dfac1..36c97dedc8 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1543,8 +1543,6 @@ def remote_function( "all", "internal-only", "internal-and-gclb" ] = "internal-only", cloud_build_service_account: Optional[str] = None, - workers: Optional[int] = 0, - concurrency: Optional[int] = 0, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1778,8 +1776,6 @@ def remote_function( cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, cloud_build_service_account=cloud_build_service_account, - workers=workers, - concurrency=concurrency, ) def deploy_udf( From f8f4f113de314030fe2f2980269e4beb19be8190 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Feb 2026 20:19:21 +0000 Subject: [PATCH 6/8] address comments --- bigframes/functions/_function_client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 44726d1757..ad46b84fe4 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -528,6 +528,9 @@ def create_cloud_function( if concurrency: function.service_config.max_instance_request_concurrency = concurrency + # Functions framework use environment variables to pass config to gunicorn + # See https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241 + # Code: https://github.com/GoogleCloudPlatform/functions-framework-python/blob/v3.10.1/src/functions_framework/_http/gunicorn.py#L37-L43 env_vars = {} if workers: env_vars["WORKERS"] = str(workers) @@ -636,7 +639,7 @@ def provision_bq_remote_function( ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) - if (cloud_function_cpus is None) and (cloud_function_memory_mib is None): + if cloud_function_memory_mib is None: cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL @@ -735,16 +738,15 @@ def get_remote_function_specs(self, remote_function_name): def _infer_cpus_from_memory(memory_mib: int) -> int: + # observed values, not formally documented by cloud run functions if memory_mib <= 2048: # in actuality, will be 0.583 for 1024mb, 0.33 for 512mb, etc, but we round up to 1 return 1 elif memory_mib <= 8192: return 2 - elif memory_mib <= 8192: - return 2 elif memory_mib <= 16384: return 4 elif memory_mib <= 32768: return 8 else: - raise ValueError("Cloud run support at most 32768MiB per instance") + raise ValueError("Cloud run supports at most 32768MiB per instance") From 823ffbc365ddc93ea74a86b4f3139ca39f1f6503 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Feb 2026 22:19:47 +0000 Subject: [PATCH 7/8] fix concurrency to 1 if cpus < 1 --- bigframes/functions/_function_client.py | 32 +++++++++++++++---------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index ad46b84fe4..4038358778 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -644,12 +644,13 @@ def provision_bq_remote_function( # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL # therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker - expected_cpus = cloud_function_cpus or _infer_cpus_from_memory( - cloud_function_memory_mib - ) - workers = expected_cpus + expected_milli_cpus = ( + cloud_function_cpus * 1000 + ) or _infer_milli_cpus_from_memory(cloud_function_memory_mib) + workers = -(expected_milli_cpus // -1000) # ceil(cpus) without invoking floats threads = 4 # (per worker) - concurrency = workers * threads + # max concurrency==1 for vcpus < 1 hard limit from cloud run + concurrency = (workers * threads) if (expected_milli_cpus >= 1000) else 1 # Create the cloud function if it does not exist if not cf_endpoint: @@ -737,16 +738,23 @@ def get_remote_function_specs(self, remote_function_name): return (http_endpoint, bq_connection) -def _infer_cpus_from_memory(memory_mib: int) -> int: +def _infer_milli_cpus_from_memory(memory_mib: int) -> int: # observed values, not formally documented by cloud run functions - if memory_mib <= 2048: - # in actuality, will be 0.583 for 1024mb, 0.33 for 512mb, etc, but we round up to 1 - return 1 + if memory_mib <= 128: + return 83 + elif memory_mib <= 256: + return 167 + elif memory_mib <= 512: + return 333 + elif memory_mib <= 1024: + return 583 + elif memory_mib <= 2048: + return 1000 elif memory_mib <= 8192: - return 2 + return 2000 elif memory_mib <= 16384: - return 4 + return 4000 elif memory_mib <= 32768: - return 8 + return 8000 else: raise ValueError("Cloud run supports at most 32768MiB per instance") From 33cd4b4b5e4e005b0c5ab39754a930d03f896a82 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 Feb 2026 22:51:09 +0000 Subject: [PATCH 8/8] fix cpu<1 cases --- bigframes/functions/_function_client.py | 6 ++- bigframes/functions/_function_session.py | 6 +-- bigframes/pandas/__init__.py | 2 +- bigframes/session/__init__.py | 6 +-- .../large/functions/test_remote_function.py | 43 +++++++++++++++---- 5 files changed, 46 insertions(+), 17 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 4038358778..66f8ca03f9 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -645,8 +645,10 @@ def provision_bq_remote_function( # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL # therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker expected_milli_cpus = ( - cloud_function_cpus * 1000 - ) or _infer_milli_cpus_from_memory(cloud_function_memory_mib) + int(cloud_function_cpus * 1000) + if (cloud_function_cpus is not None) + else _infer_milli_cpus_from_memory(cloud_function_memory_mib) + ) workers = -(expected_milli_cpus // -1000) # ceil(cpus) without invoking floats threads = 4 # (per worker) # max concurrency==1 for vcpus < 1 hard limit from cloud run diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 32c3d14eb7..7541936ede 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -249,7 +249,7 @@ def remote_function( Literal["all", "private-ranges-only", "unspecified"] ] = None, cloud_function_memory_mib: Optional[int] = None, - cloud_function_cpus: Optional[int] = None, + cloud_function_cpus: Optional[float] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", @@ -445,8 +445,8 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. - cloud_function_cpus (int, Optional): - The amounts of memory (in mebibytes) to allocate for the cloud + cloud_function_cpus (float, Optional): + The number of cpus to allocate for the cloud function (2nd gen) created. https://docs.cloud.google.com/run/docs/configuring/services/cpu. cloud_function_ingress_settings (str, Optional): diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index df562ceadd..fcb60bf778 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -89,7 +89,7 @@ def remote_function( Literal["all", "private-ranges-only", "unspecified"] ] = None, cloud_function_memory_mib: Optional[int] = None, - cloud_function_cpus: Optional[int] = None, + cloud_function_cpus: Optional[float] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 36c97dedc8..02aa8ebc52 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1538,7 +1538,7 @@ def remote_function( Literal["all", "private-ranges-only", "unspecified"] ] = None, cloud_function_memory_mib: Optional[int] = None, - cloud_function_cpus: Optional[int] = None, + cloud_function_cpus: Optional[float] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", @@ -1719,8 +1719,8 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. - cloud_function_cpus (int, Optional): - The amounts of memory (in mebibytes) to allocate for the cloud + cloud_function_cpus (float, Optional): + The number of cpus to allocate for the cloud function (2nd gen) created. https://docs.cloud.google.com/run/docs/configuring/services/cpu. cloud_function_ingress_settings (str, Optional): diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 2591c0c13a..6c6361a7f4 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -2093,19 +2093,40 @@ def foo_list(x: pandas.Series, y0: float, y1, y2) -> list[str]: @pytest.mark.parametrize( - ("memory_mib_args", "expected_memory"), + ( + "memory_mib_args", + "expected_memory", + "expected_cpus", + ), [ - pytest.param({}, "1024Mi", id="no-set"), - pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"), - pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"), - pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"), - pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"), - pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"), + pytest.param({}, "1024Mi", None, id="no-set"), + pytest.param( + {"cloud_function_memory_mib": None}, "1024Mi", None, id="set-None" + ), + pytest.param({"cloud_function_memory_mib": 128}, "128Mi", None, id="set-128"), + pytest.param( + {"cloud_function_memory_mib": 512, "cloud_function_cpus": 0.6}, + "512Mi", + "0.6", + id="set-512", + ), + pytest.param( + {"cloud_function_memory_mib": 1024}, "1024Mi", None, id="set-1024" + ), + pytest.param( + {"cloud_function_memory_mib": 4096, "cloud_function_cpus": 4}, + "4096Mi", + "4", + id="set-4096", + ), + pytest.param( + {"cloud_function_memory_mib": 32768}, "32768Mi", None, id="set-32768" + ), ], ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_gcf_memory( - session, scalars_dfs, memory_mib_args, expected_memory + session, scalars_dfs, memory_mib_args, expected_memory, expected_cpus ): try: @@ -2121,6 +2142,12 @@ def square(x: int) -> int: name=square_remote.bigframes_cloud_function ) assert gcf.service_config.available_memory == expected_memory + if expected_cpus is not None: + assert gcf.service_config.available_cpu == expected_cpus + if float(gcf.service_config.available_cpu) >= 1.0: + assert gcf.service_config.max_instance_request_concurrency >= float( + gcf.service_config.available_cpu + ) scalars_df, scalars_pandas_df = scalars_dfs