Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ DATABASE_NAME=baserow
# BASEROW_AUTOMATION_HISTORY_PAGE_SIZE_LIMIT=
# BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=
# BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS=
# BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS=
# BASEROW_EXTRA_ALLOWED_HOSTS=
# ADDITIONAL_APPS=
Expand Down
6 changes: 6 additions & 0 deletions backend/src/baserow/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ def __setitem__(self, key, value):
AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS", 5)
)
AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS = int(
os.getenv(
"BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS",
AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS,
)
)
AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS", 5)
)
Expand Down
7 changes: 6 additions & 1 deletion backend/src/baserow/contrib/automation/history/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ def create_workflow_history(
workflow: AutomationWorkflow,
started_on: datetime,
is_test_run: bool,
status: HistoryStatusChoices = HistoryStatusChoices.STARTED,
completed_on: Optional[datetime] = None,
message: str = "",
) -> AutomationWorkflowHistory:
return AutomationWorkflowHistory.objects.create(
workflow=workflow,
started_on=started_on,
is_test_run=is_test_run,
status=HistoryStatusChoices.STARTED,
status=status,
completed_on=completed_on,
message=message,
)
55 changes: 54 additions & 1 deletion backend/src/baserow/contrib/automation/workflows/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
)

WORKFLOW_RATE_LIMIT_CACHE_PREFIX = "automation_workflow_{}"
WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX = "automation_workflow_history_{}"
AUTOMATION_WORKFLOW_CACHE_LOCK_SECONDS = 5

tracer = trace.get_tracer(__name__)
Expand Down Expand Up @@ -686,11 +687,47 @@ def before_run(self, workflow: AutomationWorkflow) -> None:
self.reset_workflow_temporary_states(workflow)

self._check_too_many_errors(workflow)
self._check_is_rate_limited(workflow.id)

def _get_rate_limit_cache_key(self, workflow_id: int) -> str:
return WORKFLOW_RATE_LIMIT_CACHE_PREFIX.format(workflow_id)

def _get_workflow_history_rate_limit_cache_key(self, workflow_id: int) -> str:
return WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX.format(workflow_id)

def _should_create_rate_limited_workflow_history(self, workflow_id: int) -> bool:
"""
Checks if the workflow history should be created when rate limited.

Returns True if the history should be created, False otherwise.
"""

cache_key = self._get_workflow_history_rate_limit_cache_key(workflow_id)

should_create_history = False

def _should_create_history(history_exists):
"""
Sets should_create_history to True if this is the first time
we're checking the workflow history in this window.

Returns True because we always want to set the cache key
when update() is called.
"""

if not history_exists:
nonlocal should_create_history
should_create_history = True
return True

global_cache.update(
cache_key,
callback=_should_create_history,
default_value=lambda: False,
timeout=settings.AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS,
)

return should_create_history

def _check_is_rate_limited(self, workflow_id: int) -> None:
"""Uses a global cache key to track recent runs for the given workflow."""

Expand Down Expand Up @@ -835,6 +872,22 @@ def async_start_workflow(
:param event_payload: The payload from the action.
"""

try:
self._check_is_rate_limited(workflow.id)
except AutomationWorkflowRateLimited as e:
if self._should_create_rate_limited_workflow_history(workflow.id):
original_workflow = self.get_original_workflow(workflow)
now = timezone.now()
AutomationHistoryHandler().create_workflow_history(
original_workflow,
is_test_run=original_workflow == workflow,
started_on=now,
completed_on=now,
message=str(e),
status=HistoryStatusChoices.ERROR,
)
return

start_workflow_celery_task.delay(
workflow.id,
event_payload,
Expand Down
29 changes: 29 additions & 0 deletions backend/src/baserow/contrib/database/api/fields/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,40 @@ class UniqueRowValuesSerializer(serializers.Serializer):
values = serializers.ListSerializer(child=serializers.CharField())


@extend_schema_field(OpenApiTypes.INT)
class CollaboratorField(serializers.Field):
"""
A serializer field that accepts an int or a dict with an "id" field.
"""

def to_internal_value(self, data):
if isinstance(data, int) or (isinstance(data, str) and data.isdigit()):
return int(data)

if isinstance(data, dict):
try:
return int(data["id"])
except (KeyError, TypeError, ValueError):
pass

raise serializers.ValidationError(
"Expected an integer or an object with an 'id' field",
code="invalid",
)

def to_representation(self, value):
return value


class CollaboratorSerializer(serializers.Serializer):
id = serializers.IntegerField()
name = serializers.CharField(source="first_name", read_only=True)


class CollaboratorRequestSerializer(serializers.ListSerializer):
child = CollaboratorField()


class AvailableCollaboratorsSerializer(serializers.ListField):
def __init__(self, **kwargs):
kwargs["child"] = CollaboratorSerializer()
Expand Down
15 changes: 5 additions & 10 deletions backend/src/baserow/contrib/database/fields/field_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6744,18 +6744,13 @@ def can_represent_collaborators(self, field):
return True

def get_serializer_field(self, instance, **kwargs):
required = kwargs.pop("required", False)
field_serializer = CollaboratorSerializer(
**{
"required": required,
"allow_null": False,
**kwargs,
}
)
return serializers.ListSerializer(
child=field_serializer, required=required, **kwargs
from baserow.contrib.database.api.fields.serializers import (
CollaboratorRequestSerializer,
)

kwargs.setdefault("required", False)
return CollaboratorRequestSerializer(**kwargs)

def get_search_expression(
self, field: MultipleCollaboratorsField, queryset: QuerySet
) -> Expression:
Expand Down
13 changes: 13 additions & 0 deletions backend/tests/baserow/contrib/automation/history/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from baserow.contrib.automation.history.models import AutomationWorkflowHistory


def assert_history(workflow, expected_count, expected_status, expected_msg):
"""Helper to test AutomationWorkflowHistory objects."""

histories = AutomationWorkflowHistory.objects.filter(workflow=workflow)
assert len(histories) == expected_count
if expected_count > 0:
history = histories[0]
assert history.workflow == workflow
assert history.status == expected_status
assert history.message == expected_msg
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler
from baserow.core.trash.handler import TrashHandler
from tests.baserow.contrib.automation.history.utils import assert_history

WORKFLOWS_MODULE = "baserow.contrib.automation.workflows"
HANDLER_MODULE = f"{WORKFLOWS_MODULE}.handler"
Expand Down Expand Up @@ -569,6 +570,71 @@ def test_check_is_rate_limited_raises_if_above_limit():
)


@pytest.mark.django_db
@override_settings(
AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS=5,
AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=2,
)
@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task")
def test_workflow_rate_limiter_is_checked_before_starting_celery_task(
mock_celery_task, data_fixture
):
user = data_fixture.create_user()

original_workflow = data_fixture.create_automation_workflow(user=user)
published_workflow = data_fixture.create_automation_workflow(
state=WorkflowState.LIVE, user=user
)
published_workflow.automation.published_from = original_workflow
published_workflow.automation.save()

handler = AutomationWorkflowHandler()
rate_limited_error = "The workflow was rate limited due to too many recent runs."

with freeze_time("2026-01-26 13:00:00"):
# First 2 calls should queue workflow runs
handler.async_start_workflow(published_workflow)
handler.async_start_workflow(published_workflow)
assert mock_celery_task.delay.call_count == 2
assert_history(original_workflow, 0, "error", rate_limited_error)

# 3rd call should be rate limited
handler.async_start_workflow(published_workflow)
assert mock_celery_task.delay.call_count == 2
assert_history(original_workflow, 1, "error", rate_limited_error)


@pytest.mark.django_db
@override_settings(
AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS=5,
)
@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task")
def test_should_create_rate_limited_workflow_history(mock_celery_task, data_fixture):
workflow_id = 99999
handler = AutomationWorkflowHandler()

with freeze_time("2026-01-26 13:00:00"):
# True because this is the first time we're attempting to create a history
assert handler._should_create_rate_limited_workflow_history(workflow_id) is True

# False because a history already exists within the expiry window
assert (
handler._should_create_rate_limited_workflow_history(workflow_id) is False
)
assert (
handler._should_create_rate_limited_workflow_history(workflow_id) is False
)

with freeze_time("2026-01-26 13:00:06"):
# True because the cache window of 5 seconds has expired
assert handler._should_create_rate_limited_workflow_history(workflow_id) is True

# False because a new history was created via the previous call to the method
assert (
handler._should_create_rate_limited_workflow_history(workflow_id) is False
)


@pytest.mark.django_db
def test_disable_workflow_disables_original_workflow(data_fixture):
original_workflow = data_fixture.create_automation_workflow()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from unittest.mock import patch

from django.test import override_settings

import pytest

from baserow.contrib.automation.history.models import AutomationWorkflowHistory
from baserow.contrib.automation.workflows.constants import WorkflowState
from baserow.contrib.automation.workflows.exceptions import (
AutomationWorkflowRateLimited,
AutomationWorkflowTooManyErrors,
)
from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler
Expand Down Expand Up @@ -110,63 +107,6 @@ def test_run_workflow_unexpected_error_creates_workflow_history(
mock_logger.exception.assert_called_once_with(error_msg)


def assert_history(workflow, expected_count, expected_status, expected_msg):
histories = AutomationWorkflowHistory.objects.filter(workflow=workflow)
assert len(histories) == expected_count
history = histories[0]
assert history.workflow == workflow
assert history.status == expected_status
assert history.message == expected_msg


@pytest.mark.django_db
@override_settings(
AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS=3,
AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=10,
)
@patch(
"baserow.contrib.automation.workflows.handler.AutomationWorkflowHandler._check_is_rate_limited"
)
@patch("baserow.contrib.automation.nodes.handler.AutomationNodeHandler.dispatch_node")
def test_run_workflow_disables_workflow_if_too_many_errors(
mock_dispatch_node, mock_is_rate_limited, data_fixture
):
mock_is_rate_limited.side_effect = AutomationWorkflowRateLimited(
"mock rate limited error"
)

original_workflow = data_fixture.create_automation_workflow()
published_workflow = data_fixture.create_automation_workflow(
state=WorkflowState.LIVE
)
published_workflow.automation.published_from = original_workflow
published_workflow.automation.save()

# The first 3 runs should just be an error
for i in range(3):
start_workflow_celery_task(published_workflow.id, False, None)
mock_dispatch_node.assert_not_called()
assert_history(original_workflow, i + 1, "error", "mock rate limited error")
original_workflow.refresh_from_db()
published_workflow.refresh_from_db()
assert original_workflow.state == WorkflowState.DRAFT
assert published_workflow.state == WorkflowState.LIVE

# The fourth run should disable the workflow due to too many errors
start_workflow_celery_task(published_workflow.id, False, None)
mock_dispatch_node.assert_not_called()
assert_history(
original_workflow,
4,
"disabled",
f"The workflow {original_workflow.id} was disabled due to too many consecutive errors.",
)
original_workflow.refresh_from_db()
published_workflow.refresh_from_db()
assert original_workflow.state == WorkflowState.DISABLED
assert published_workflow.state == WorkflowState.DISABLED


@pytest.mark.django_db
@patch(
"baserow.contrib.automation.workflows.handler.AutomationWorkflowHandler._check_too_many_errors"
Expand Down
Loading
Loading