Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 105 additions & 27 deletions singlestoredb/functions/ext/plugin/control.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
"""
Control signal dispatch for @@health, @@functions, @@register, @@delete.

Matches the Rust wasm-udf-server's dispatch_control_signal behavior.
Matches the Rust wasm-udf-server's dispatch_control_signal behavior, including
the structured-error-code shape from ADR 0001
(``{"error": "...", "code": "SCREAMING_SNAKE"}`` on errors). The authoritative
catalog of codes lives in ADR 0001 in the ``wasm-udf-server`` repo; the codes
emitted from this module are:

- ``UNKNOWN_SIGNAL`` — unrecognized ``@@``-prefixed signal name.
- ``INTERNAL_ERROR`` — cross-cutting fallback for unexpected handler exceptions.
- ``REGISTER_MISSING_PAYLOAD`` — ``@@register`` called with an empty body.
- ``REGISTER_INVALID_PAYLOAD`` — ``@@register`` body failed JSON parsing or
field validation.
- ``REGISTER_FUNC_EXISTS`` — function with the same name is already registered
and ``replace`` was not requested.
- ``REGISTER_FUNC_NOT_DYNAMIC`` — registration targets a name reserved by a
base/built-in function. This covers both replacing a base function (when
``replace`` is requested) and registering a new function whose name collides
with a base function (even when ``replace`` is not requested).
- ``DELETE_MISSING_PAYLOAD`` — ``@@delete`` called with an empty body.
- ``DELETE_INVALID_PAYLOAD`` — ``@@delete`` body failed JSON parsing or field
validation.
- ``DELETE_FUNC_NOT_REGISTERED`` — target function exists but was not
dynamically registered, so it cannot be deleted.
- ``DELETE_FUNC_NOT_FOUND`` — target function does not exist.

The ``REGISTER_DISABLED`` and ``DELETE_DISABLED`` codes from that catalog have
no call site here because this server has no registration enable/disable flag.
"""
from __future__ import annotations

Expand All @@ -11,6 +36,9 @@
from typing import TYPE_CHECKING

from .registry import describe_functions_json
from .registry import FunctionExistsError
from .registry import FunctionNotDynamicError
from .registry import FunctionNotFoundError

if TYPE_CHECKING:
from .server import SharedRegistry
Expand All @@ -22,7 +50,19 @@
class ControlResult:
"""Result of a control signal dispatch."""
ok: bool
data: str # JSON response on success, error message on failure
# JSON response. On success (``ok=True``) this is a handler-specific
# document such as ``{"status":"ok"}`` or ``{"functions":[...]}``. On
# failure (``ok=False``) this is the ADR 0001 error shape
# ``{"error":"...","code":"..."}``.
data: str


def _err(message: str, code: str) -> ControlResult:
"""Build an error ControlResult with the ADR 0001 JSON shape."""
return ControlResult(
ok=False,
data=json.dumps({'error': message, 'code': code}),
)


def dispatch_control_signal(
Expand All @@ -46,12 +86,12 @@ def dispatch_control_signal(
request_data, shared_registry, pipe_write_fd,
)
else:
return ControlResult(
ok=False,
data=f'Unknown control signal: {signal_name}',
return _err(
f'Unknown control signal: {signal_name}',
'UNKNOWN_SIGNAL',
)
except Exception as e:
return ControlResult(ok=False, data=str(e))
return _err(str(e), 'INTERNAL_ERROR')


def _handle_health() -> ControlResult:
Expand Down Expand Up @@ -83,39 +123,59 @@ def _handle_register(
own registry and re-fork all workers.
"""
if not request_data:
return ControlResult(ok=False, data='Missing registration payload')
return _err('Missing registration payload', 'REGISTER_MISSING_PAYLOAD')

try:
body = json.loads(request_data)
except json.JSONDecodeError as e:
return ControlResult(ok=False, data=f'Invalid JSON: {e}')
except (json.JSONDecodeError, UnicodeDecodeError) as e:
return _err(f'Invalid JSON: {e}', 'REGISTER_INVALID_PAYLOAD')
Comment thread
kesmit13 marked this conversation as resolved.

Comment thread
kesmit13 marked this conversation as resolved.
if not isinstance(body, dict):
return _err(
'Request body must be a JSON object',
'REGISTER_INVALID_PAYLOAD',
)

function_name = body.get('name')
if not function_name:
return ControlResult(
ok=False, data='Missing required field: name',
return _err(
'Missing required field: name', 'REGISTER_INVALID_PAYLOAD',
)
if not isinstance(function_name, str):
return _err(
'Field "name" must be a string', 'REGISTER_INVALID_PAYLOAD',
)

args = body.get('args')
if not isinstance(args, list):
return ControlResult(
ok=False, data='Missing required field: args (must be an array)',
return _err(
'Missing required field: args (must be an array)',
'REGISTER_INVALID_PAYLOAD',
)

returns = body.get('returns')
if not isinstance(returns, list):
return ControlResult(
ok=False,
data='Missing required field: returns (must be an array)',
return _err(
'Missing required field: returns (must be an array)',
'REGISTER_INVALID_PAYLOAD',
)

func_body = body.get('body')
if not func_body:
return ControlResult(
ok=False, data='Missing required field: body',
return _err(
'Missing required field: body', 'REGISTER_INVALID_PAYLOAD',
)
Comment thread
kesmit13 marked this conversation as resolved.
if not isinstance(func_body, str):
return _err(
'Field "body" must be a string', 'REGISTER_INVALID_PAYLOAD',
)

replace = body.get('replace', False)
if not isinstance(replace, bool):
return _err(
'Field "replace" must be a boolean',
'REGISTER_INVALID_PAYLOAD',
)

# Build signature JSON matching describe-functions schema
signature = json.dumps({
Expand All @@ -126,8 +186,12 @@ def _handle_register(

try:
shared_registry.create_function(signature, func_body, replace)
except Exception as e:
return ControlResult(ok=False, data=str(e))
except FunctionExistsError as e:
return _err(str(e), 'REGISTER_FUNC_EXISTS')
except FunctionNotDynamicError as e:
return _err(str(e), 'REGISTER_FUNC_NOT_DYNAMIC')
except (ValueError, SyntaxError, TypeError) as e:
return _err(str(e), 'REGISTER_INVALID_PAYLOAD')
Comment thread
cursor[bot] marked this conversation as resolved.

# Notify main process so it can re-fork workers with updated state
if pipe_write_fd is not None:
Expand All @@ -151,23 +215,37 @@ def _handle_delete(
) -> ControlResult:
"""Handle @@delete: delete a dynamically registered function."""
if not request_data:
return ControlResult(ok=False, data='Missing deletion payload')
return _err('Missing deletion payload', 'DELETE_MISSING_PAYLOAD')

try:
body = json.loads(request_data)
except json.JSONDecodeError as e:
return ControlResult(ok=False, data=f'Invalid JSON: {e}')
except (json.JSONDecodeError, UnicodeDecodeError) as e:
return _err(f'Invalid JSON: {e}', 'DELETE_INVALID_PAYLOAD')
Comment thread
kesmit13 marked this conversation as resolved.

Comment thread
kesmit13 marked this conversation as resolved.
if not isinstance(body, dict):
return _err(
'Request body must be a JSON object',
'DELETE_INVALID_PAYLOAD',
)

function_name = body.get('name')
if not function_name:
return ControlResult(
ok=False, data='Missing required field: name',
return _err(
'Missing required field: name', 'DELETE_INVALID_PAYLOAD',
)
Comment thread
kesmit13 marked this conversation as resolved.
if not isinstance(function_name, str):
return _err(
'Field "name" must be a string', 'DELETE_INVALID_PAYLOAD',
)

try:
shared_registry.delete_function(function_name)
except ValueError as e:
return ControlResult(ok=False, data=str(e))
except FunctionNotDynamicError as e:
return _err(str(e), 'DELETE_FUNC_NOT_REGISTERED')
except FunctionNotFoundError as e:
return _err(str(e), 'DELETE_FUNC_NOT_FOUND')
except Exception as e:
return _err(str(e), 'INTERNAL_ERROR')

# Notify main process so it can re-fork workers with updated state
if pipe_write_fd is not None:
Expand Down
60 changes: 50 additions & 10 deletions singlestoredb/functions/ext/plugin/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ def setup_logging(level: int = logging.INFO) -> None:
logger = logging.getLogger('udf_handler')


class FunctionExistsError(ValueError):
"""Raised when a function name is already registered and replace=False."""


class FunctionNotDynamicError(ValueError):
"""Raised when an operation (register, replace, or delete) targets a
function name reserved by a base/built-in (non-dynamic) function. This
covers registering a new function whose name collides with a base
function as well as attempts to replace or delete a base function."""


class FunctionNotFoundError(ValueError):
"""Raised when the target function does not exist (delete only)."""


class FunctionRegistry:
"""Registry of discovered UDF functions."""

Expand Down Expand Up @@ -386,18 +401,39 @@ def create_function(
'signature JSON must contain a "name" field',
)

for kind in ('args', 'returns'):
items = sig.get(kind, [])
if not isinstance(items, list):
raise ValueError(f'"{kind}" must be a list')
for i, item in enumerate(items):
if not isinstance(item, dict):
raise ValueError(
f'"{kind}[{i}]" must be a JSON object',
)
dtype = item.get('dtype')
if not isinstance(dtype, str):
raise ValueError(
f'"{kind}[{i}].dtype" must be a string',
)
if kind == 'args':
name = item.get('name')
if not isinstance(name, str) or not name:
raise ValueError(
f'"{kind}[{i}].name" must be a non-empty string',
)

if func_name in self._base_function_names:
raise FunctionNotDynamicError(
f"Cannot register '{func_name}': "
f'name is reserved by a built-in (non-dynamic) function',
)

if not replace and func_name in self.functions:
raise ValueError(
raise FunctionExistsError(
f'Function "{func_name}" already exists '
f'(use replace=true to overwrite)',
)
Comment thread
kesmit13 marked this conversation as resolved.

if func_name in self._base_function_names:
raise ValueError(
f"Cannot replace '{func_name}': "
f'not a dynamically registered function',
)

if replace and func_name in self.functions:
del self.functions[func_name]

Expand Down Expand Up @@ -438,7 +474,11 @@ def delete_function(self, signature_json: str) -> None:
element schema (must contain a 'name' field). Currently
only the name is used for matching.

Raises ValueError if the function does not exist.
Raises:
ValueError: if the signature JSON is missing a "name" field.
FunctionNotFoundError: if no function with that name is registered.
FunctionNotDynamicError: if the function exists but was not
dynamically registered (e.g., a built-in).
"""
sig = json.loads(signature_json)
name = sig.get('name')
Expand All @@ -447,9 +487,9 @@ def delete_function(self, signature_json: str) -> None:
'signature JSON must contain a "name" field',
)
if name not in self.functions:
raise ValueError(f"Function '{name}' not found")
raise FunctionNotFoundError(f"Function '{name}' not found")
if name in self._base_function_names:
raise ValueError(
raise FunctionNotDynamicError(
f"Cannot delete '{name}': not a dynamically registered function",
Comment thread
kesmit13 marked this conversation as resolved.
)
del self.functions[name]
Expand Down
11 changes: 7 additions & 4 deletions singlestoredb/functions/ext/plugin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

from .connection import _write_all_fd
from .connection import handle_connection
from .registry import FunctionNotDynamicError
from .registry import FunctionNotFoundError
from .registry import FunctionRegistry

logger = logging.getLogger('plugin.server')
Expand Down Expand Up @@ -131,8 +133,9 @@ def create_function(
def delete_function(self, function_name: str) -> None:
"""Delete a dynamically registered function by name.

Raises ValueError if the function was not dynamically registered
or does not exist at all.
Raises FunctionNotDynamicError if the function exists as a base
(non-dynamic) function, FunctionNotFoundError if it does not
exist at all.
"""
with self._lock:
# Find matching code blocks by parsing signature_json
Expand All @@ -148,11 +151,11 @@ def delete_function(self, function_name: str) -> None:
self._base_registry is not None
and function_name in self._base_registry.functions
):
raise ValueError(
raise FunctionNotDynamicError(
f"Cannot delete '{function_name}': "
f'not a dynamically registered function',
)
raise ValueError(
raise FunctionNotFoundError(
f"Function '{function_name}' not found",
)

Expand Down
Loading
Loading