diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index a82217da03..66f8ca03f9 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -65,6 +65,8 @@ # BQ managed functions (@udf) currently only support Python 3.11. _MANAGED_FUNC_PYTHON_VERSION = "python-3.11" +_DEFAULT_FUNCTION_MEMORY_MIB = 1024 + class FunctionClient: # Wait time (in seconds) for an IAM binding to take effect after creation. @@ -402,8 +404,12 @@ def create_cloud_function( is_row_processor=False, vpc_connector=None, vpc_connector_egress_settings="private-ranges-only", - memory_mib=1024, + memory_mib=None, + cpus=None, ingress_settings="internal-only", + workers=None, + threads=None, + concurrency=None, ): """Create a cloud function from the given user defined function.""" @@ -486,6 +492,8 @@ def create_cloud_function( function.service_config = functions_v2.ServiceConfig() if memory_mib is not None: function.service_config.available_memory = f"{memory_mib}Mi" + if cpus is not None: + function.service_config.available_cpu = str(cpus) if timeout_seconds is not None: if timeout_seconds > 1200: raise bf_formatting.create_exception_with_feedback_link( @@ -517,6 +525,20 @@ def create_cloud_function( function.service_config.service_account_email = ( self._cloud_function_service_account ) + if concurrency: + function.service_config.max_instance_request_concurrency = concurrency + + # Functions framework use environment variables to pass config to gunicorn + # See https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241 + # Code: https://github.com/GoogleCloudPlatform/functions-framework-python/blob/v3.10.1/src/functions_framework/_http/gunicorn.py#L37-L43 + env_vars = {} + if workers: + env_vars["WORKERS"] = str(workers) + if threads: + env_vars["THREADS"] = str(threads) + if env_vars: + function.service_config.environment_variables = env_vars + if ingress_settings not in _INGRESS_SETTINGS_MAP: raise bf_formatting.create_exception_with_feedback_link( ValueError, @@ -581,6 +603,7 @@ def provision_bq_remote_function( cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib, + cloud_function_cpus, cloud_function_ingress_settings, bq_metadata, ): @@ -616,6 +639,21 @@ def provision_bq_remote_function( ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) + if cloud_function_memory_mib is None: + cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB + + # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL + # therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker + expected_milli_cpus = ( + int(cloud_function_cpus * 1000) + if (cloud_function_cpus is not None) + else _infer_milli_cpus_from_memory(cloud_function_memory_mib) + ) + workers = -(expected_milli_cpus // -1000) # ceil(cpus) without invoking floats + threads = 4 # (per worker) + # max concurrency==1 for vcpus < 1 hard limit from cloud run + concurrency = (workers * threads) if (expected_milli_cpus >= 1000) else 1 + # Create the cloud function if it does not exist if not cf_endpoint: cf_endpoint = self.create_cloud_function( @@ -630,7 +668,11 @@ def provision_bq_remote_function( vpc_connector=cloud_function_vpc_connector, vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, memory_mib=cloud_function_memory_mib, + cpus=cloud_function_cpus, ingress_settings=cloud_function_ingress_settings, + workers=workers, + threads=threads, + concurrency=concurrency, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -696,3 +738,25 @@ def get_remote_function_specs(self, remote_function_name): # Note: list_routines doesn't make an API request until we iterate on the response object. pass return (http_endpoint, bq_connection) + + +def _infer_milli_cpus_from_memory(memory_mib: int) -> int: + # observed values, not formally documented by cloud run functions + if memory_mib <= 128: + return 83 + elif memory_mib <= 256: + return 167 + elif memory_mib <= 512: + return 333 + elif memory_mib <= 1024: + return 583 + elif memory_mib <= 2048: + return 1000 + elif memory_mib <= 8192: + return 2000 + elif memory_mib <= 16384: + return 4000 + elif memory_mib <= 32768: + return 8000 + else: + raise ValueError("Cloud run supports at most 32768MiB per instance") diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index b0fc25219a..7541936ede 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -248,7 +248,8 @@ def remote_function( cloud_function_vpc_connector_egress_settings: Optional[ Literal["all", "private-ranges-only", "unspecified"] ] = None, - cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_memory_mib: Optional[int] = None, + cloud_function_cpus: Optional[float] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", @@ -444,6 +445,10 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_cpus (float, Optional): + The number of cpus to allocate for the cloud + function (2nd gen) created. + https://docs.cloud.google.com/run/docs/configuring/services/cpu. cloud_function_ingress_settings (str, Optional): Ingress settings controls dictating what traffic can reach the function. Options are: `all`, `internal-only`, or `internal-and-gclb`. @@ -638,6 +643,7 @@ def wrapper(func): cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, bq_metadata=bqrf_metadata, ) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index a70e319747..fcb60bf778 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -88,7 +88,8 @@ def remote_function( cloud_function_vpc_connector_egress_settings: Optional[ Literal["all", "private-ranges-only", "unspecified"] ] = None, - cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_memory_mib: Optional[int] = None, + cloud_function_cpus: Optional[float] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", @@ -112,6 +113,7 @@ def remote_function( cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, cloud_build_service_account=cloud_build_service_account, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c42270c4dd..7ea6e99954 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1536,7 +1536,8 @@ def remote_function( cloud_function_vpc_connector_egress_settings: Optional[ Literal["all", "private-ranges-only", "unspecified"] ] = None, - cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_memory_mib: Optional[int] = None, + cloud_function_cpus: Optional[float] = None, cloud_function_ingress_settings: Literal[ "all", "internal-only", "internal-and-gclb" ] = "internal-only", @@ -1717,6 +1718,10 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_cpus (float, Optional): + The number of cpus to allocate for the cloud + function (2nd gen) created. + https://docs.cloud.google.com/run/docs/configuring/services/cpu. cloud_function_ingress_settings (str, Optional): Ingress settings controls dictating what traffic can reach the function. Options are: `all`, `internal-only`, or `internal-and-gclb`. @@ -1767,6 +1772,7 @@ def remote_function( cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, cloud_build_service_account=cloud_build_service_account, ) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 4b5d143c15..8d79a77fa4 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -2089,19 +2089,40 @@ def foo_list(x: pandas.Series, y0: float, y1, y2) -> list[str]: @pytest.mark.parametrize( - ("memory_mib_args", "expected_memory"), + ( + "memory_mib_args", + "expected_memory", + "expected_cpus", + ), [ - pytest.param({}, "1024Mi", id="no-set"), - pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"), - pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"), - pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"), - pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"), - pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"), + pytest.param({}, "1024Mi", None, id="no-set"), + pytest.param( + {"cloud_function_memory_mib": None}, "1024Mi", None, id="set-None" + ), + pytest.param({"cloud_function_memory_mib": 128}, "128Mi", None, id="set-128"), + pytest.param( + {"cloud_function_memory_mib": 512, "cloud_function_cpus": 0.6}, + "512Mi", + "0.6", + id="set-512", + ), + pytest.param( + {"cloud_function_memory_mib": 1024}, "1024Mi", None, id="set-1024" + ), + pytest.param( + {"cloud_function_memory_mib": 4096, "cloud_function_cpus": 4}, + "4096Mi", + "4", + id="set-4096", + ), + pytest.param( + {"cloud_function_memory_mib": 32768}, "32768Mi", None, id="set-32768" + ), ], ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_gcf_memory( - session, scalars_dfs, memory_mib_args, expected_memory + session, scalars_dfs, memory_mib_args, expected_memory, expected_cpus ): try: @@ -2117,6 +2138,12 @@ def square(x: int) -> int: name=square_remote.bigframes_cloud_function ) assert gcf.service_config.available_memory == expected_memory + if expected_cpus is not None: + assert gcf.service_config.available_cpu == expected_cpus + if float(gcf.service_config.available_cpu) >= 1.0: + assert gcf.service_config.max_instance_request_concurrency >= float( + gcf.service_config.available_cpu + ) scalars_df, scalars_pandas_df = scalars_dfs