Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
# BQ managed functions (@udf) currently only support Python 3.11.
_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"

_DEFAULT_FUNCTION_MEMORY_MIB = 1024


class FunctionClient:
# Wait time (in seconds) for an IAM binding to take effect after creation.
Expand Down Expand Up @@ -402,8 +404,12 @@ def create_cloud_function(
is_row_processor=False,
vpc_connector=None,
vpc_connector_egress_settings="private-ranges-only",
memory_mib=1024,
memory_mib=None,
cpus=None,
ingress_settings="internal-only",
workers=None,
threads=None,
concurrency=None,
):
"""Create a cloud function from the given user defined function."""

Expand Down Expand Up @@ -486,6 +492,8 @@ def create_cloud_function(
function.service_config = functions_v2.ServiceConfig()
if memory_mib is not None:
function.service_config.available_memory = f"{memory_mib}Mi"
if cpus is not None:
function.service_config.available_cpu = str(cpus)
if timeout_seconds is not None:
if timeout_seconds > 1200:
raise bf_formatting.create_exception_with_feedback_link(
Expand Down Expand Up @@ -517,6 +525,20 @@ def create_cloud_function(
function.service_config.service_account_email = (
self._cloud_function_service_account
)
if concurrency:
function.service_config.max_instance_request_concurrency = concurrency

# Functions framework use environment variables to pass config to gunicorn
# See https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241
# Code: https://github.com/GoogleCloudPlatform/functions-framework-python/blob/v3.10.1/src/functions_framework/_http/gunicorn.py#L37-L43
env_vars = {}
if workers:
env_vars["WORKERS"] = str(workers)
if threads:
env_vars["THREADS"] = str(threads)
if env_vars:
function.service_config.environment_variables = env_vars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know where to put these configs to? Like this function.service_config.environment_variables? Is there a cloud run doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhh, I just found it with ide auto-complete.
The specific environment variables are from functions framework hooks: see GoogleCloudPlatform/functions-framework-python#241


if ingress_settings not in _INGRESS_SETTINGS_MAP:
raise bf_formatting.create_exception_with_feedback_link(
ValueError,
Expand Down Expand Up @@ -581,6 +603,7 @@ def provision_bq_remote_function(
cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib,
cloud_function_cpus,
cloud_function_ingress_settings,
bq_metadata,
):
Expand Down Expand Up @@ -616,6 +639,21 @@ def provision_bq_remote_function(
)
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)

if cloud_function_memory_mib is None:
cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB

# assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL
# therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker
expected_milli_cpus = (
int(cloud_function_cpus * 1000)
if (cloud_function_cpus is not None)
else _infer_milli_cpus_from_memory(cloud_function_memory_mib)
)
workers = -(expected_milli_cpus // -1000) # ceil(cpus) without invoking floats
threads = 4 # (per worker)
# max concurrency==1 for vcpus < 1 hard limit from cloud run
concurrency = (workers * threads) if (expected_milli_cpus >= 1000) else 1

# Create the cloud function if it does not exist
if not cf_endpoint:
cf_endpoint = self.create_cloud_function(
Expand All @@ -630,7 +668,11 @@ def provision_bq_remote_function(
vpc_connector=cloud_function_vpc_connector,
vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
memory_mib=cloud_function_memory_mib,
cpus=cloud_function_cpus,
ingress_settings=cloud_function_ingress_settings,
workers=workers,
threads=threads,
concurrency=concurrency,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down Expand Up @@ -696,3 +738,25 @@ def get_remote_function_specs(self, remote_function_name):
# Note: list_routines doesn't make an API request until we iterate on the response object.
pass
return (http_endpoint, bq_connection)


def _infer_milli_cpus_from_memory(memory_mib: int) -> int:
# observed values, not formally documented by cloud run functions
if memory_mib <= 128:
return 83
elif memory_mib <= 256:
return 167
elif memory_mib <= 512:
return 333
elif memory_mib <= 1024:
return 583
elif memory_mib <= 2048:
return 1000
elif memory_mib <= 8192:
return 2000
elif memory_mib <= 16384:
return 4000
elif memory_mib <= 32768:
return 8000
else:
raise ValueError("Cloud run supports at most 32768MiB per instance")
8 changes: 7 additions & 1 deletion bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ def remote_function(
cloud_function_vpc_connector_egress_settings: Optional[
Literal["all", "private-ranges-only", "unspecified"]
] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_memory_mib: Optional[int] = None,
cloud_function_cpus: Optional[float] = None,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "internal-only",
Expand Down Expand Up @@ -444,6 +445,10 @@ def remote_function(
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
cloud_function_cpus (float, Optional):
The number of cpus to allocate for the cloud
function (2nd gen) created.
https://docs.cloud.google.com/run/docs/configuring/services/cpu.
cloud_function_ingress_settings (str, Optional):
Ingress settings controls dictating what traffic can reach the
function. Options are: `all`, `internal-only`, or `internal-and-gclb`.
Expand Down Expand Up @@ -638,6 +643,7 @@ def wrapper(func):
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_cpus=cloud_function_cpus,
cloud_function_ingress_settings=cloud_function_ingress_settings,
bq_metadata=bqrf_metadata,
)
Expand Down
4 changes: 3 additions & 1 deletion bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def remote_function(
cloud_function_vpc_connector_egress_settings: Optional[
Literal["all", "private-ranges-only", "unspecified"]
] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_memory_mib: Optional[int] = None,
cloud_function_cpus: Optional[float] = None,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "internal-only",
Expand All @@ -112,6 +113,7 @@ def remote_function(
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_cpus=cloud_function_cpus,
cloud_function_ingress_settings=cloud_function_ingress_settings,
cloud_build_service_account=cloud_build_service_account,
)
Expand Down
8 changes: 7 additions & 1 deletion bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,8 @@ def remote_function(
cloud_function_vpc_connector_egress_settings: Optional[
Literal["all", "private-ranges-only", "unspecified"]
] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_memory_mib: Optional[int] = None,
cloud_function_cpus: Optional[float] = None,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "internal-only",
Expand Down Expand Up @@ -1717,6 +1718,10 @@ def remote_function(
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
cloud_function_cpus (float, Optional):
The number of cpus to allocate for the cloud
function (2nd gen) created.
https://docs.cloud.google.com/run/docs/configuring/services/cpu.
cloud_function_ingress_settings (str, Optional):
Ingress settings controls dictating what traffic can reach the
function. Options are: `all`, `internal-only`, or `internal-and-gclb`.
Expand Down Expand Up @@ -1767,6 +1772,7 @@ def remote_function(
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_cpus=cloud_function_cpus,
cloud_function_ingress_settings=cloud_function_ingress_settings,
cloud_build_service_account=cloud_build_service_account,
)
Expand Down
43 changes: 35 additions & 8 deletions tests/system/large/functions/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -2089,19 +2089,40 @@ def foo_list(x: pandas.Series, y0: float, y1, y2) -> list[str]:


@pytest.mark.parametrize(
("memory_mib_args", "expected_memory"),
(
"memory_mib_args",
"expected_memory",
"expected_cpus",
),
[
pytest.param({}, "1024Mi", id="no-set"),
pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"),
pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"),
pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"),
pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"),
pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"),
pytest.param({}, "1024Mi", None, id="no-set"),
pytest.param(
{"cloud_function_memory_mib": None}, "1024Mi", None, id="set-None"
),
pytest.param({"cloud_function_memory_mib": 128}, "128Mi", None, id="set-128"),
pytest.param(
{"cloud_function_memory_mib": 512, "cloud_function_cpus": 0.6},
"512Mi",
"0.6",
id="set-512",
),
pytest.param(
{"cloud_function_memory_mib": 1024}, "1024Mi", None, id="set-1024"
),
pytest.param(
{"cloud_function_memory_mib": 4096, "cloud_function_cpus": 4},
"4096Mi",
"4",
id="set-4096",
),
pytest.param(
{"cloud_function_memory_mib": 32768}, "32768Mi", None, id="set-32768"
),
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_gcf_memory(
session, scalars_dfs, memory_mib_args, expected_memory
session, scalars_dfs, memory_mib_args, expected_memory, expected_cpus
):
try:

Expand All @@ -2117,6 +2138,12 @@ def square(x: int) -> int:
name=square_remote.bigframes_cloud_function
)
assert gcf.service_config.available_memory == expected_memory
if expected_cpus is not None:
assert gcf.service_config.available_cpu == expected_cpus
if float(gcf.service_config.available_cpu) >= 1.0:
assert gcf.service_config.max_instance_request_concurrency >= float(
gcf.service_config.available_cpu
)

scalars_df, scalars_pandas_df = scalars_dfs

Expand Down
Loading