diff --git a/.env.example b/.env.example index 2ab5925dc9..80d89d1c3d 100644 --- a/.env.example +++ b/.env.example @@ -69,6 +69,7 @@ DATABASE_NAME=baserow # BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS= # BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS= # BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS= +# BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS= # BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS= # BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES= # BASEROW_EXTRA_ALLOWED_HOSTS= diff --git a/backend/justfile b/backend/justfile index c3dfce6130..a87d620520 100644 --- a/backend/justfile +++ b/backend/justfile @@ -72,6 +72,9 @@ venv_dir := env("UV_PROJECT_ENVIRONMENT", justfile_directory() / "../.venv") # Export for uv to use export UV_PROJECT_ENVIRONMENT := venv_dir +# Prevent stdout buffering +export PYTHONUNBUFFERED := "1" + # Use --active to respect VIRTUAL_ENV, since venv may be in parent dir uv_run := "uv run --active" diff --git a/backend/src/baserow/config/settings/base.py b/backend/src/baserow/config/settings/base.py index d338d7b833..86213e7f05 100644 --- a/backend/src/baserow/config/settings/base.py +++ b/backend/src/baserow/config/settings/base.py @@ -835,6 +835,9 @@ def __setitem__(self, key, value): AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS = int( os.getenv("BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS", 5) ) +AUTOMATION_WORKFLOW_TIMEOUT_HOURS = int( + os.getenv("BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS", 24) +) AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS = int( os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS", 30) ) diff --git a/backend/src/baserow/contrib/automation/nodes/handler.py b/backend/src/baserow/contrib/automation/nodes/handler.py index 5391089162..1e6475eb5c 100644 --- a/backend/src/baserow/contrib/automation/nodes/handler.py +++ b/backend/src/baserow/contrib/automation/nodes/handler.py @@ -396,10 +396,6 @@ def dispatch_node( dispatch, otherwise returns None. """ - from baserow.contrib.automation.workflows.handler import ( - AutomationWorkflowHandler, - ) - history_handler = AutomationHistoryHandler() try: @@ -450,9 +446,8 @@ def dispatch_node( self._handle_workflow_error(node_history, error) return None except Exception as e: - original_workflow = AutomationWorkflowHandler().get_original_workflow( - node.workflow - ) + original_workflow = node.workflow.get_original() + error = ( f"Unexpected error while running workflow {original_workflow.id}. " f"Error: {str(e)}" @@ -467,12 +462,6 @@ def dispatch_node( # Use the normalized iteration index from the context. iteration_index = dispatch_context.current_iterations[parent_nodes[-1].id] - history_handler.create_node_result( - node_history=node_history, - result=dispatch_result.data, - iteration=iteration_index, - ) - # Return early if this is a simulation as we've reached the # simulated node. if until_node := simulate_until_node: @@ -481,6 +470,12 @@ def dispatch_node( automation_node_updated.send(self, user=None, node=until_node) return None + history_handler.create_node_result( + node_history=node_history, + result=dispatch_result.data, + iteration=iteration_index, + ) + to_chain = [] if children := node.get_children(): node_data = dispatch_result.data["results"] diff --git a/backend/src/baserow/contrib/automation/nodes/tasks.py b/backend/src/baserow/contrib/automation/nodes/tasks.py index 0c80c29171..9a063637ae 100644 --- a/backend/src/baserow/contrib/automation/nodes/tasks.py +++ b/backend/src/baserow/contrib/automation/nodes/tasks.py @@ -12,7 +12,7 @@ def dispatch_node_celery_task( node_id: int, history_id: int, current_iterations: Optional[Dict[int, int]] = None, -): +) -> Signature | None: from baserow.contrib.automation.nodes.handler import AutomationNodeHandler # The atomic context should only wrap the dispatch_node() call. If @@ -37,3 +37,5 @@ def _dispatch(): # by a worker (which again calls dispatch_node_celery_task). if isinstance(result, Signature): return self.replace(result) + + return None diff --git a/backend/src/baserow/contrib/automation/workflows/handler.py b/backend/src/baserow/contrib/automation/workflows/handler.py index 899cb29e17..9784cf1d18 100644 --- a/backend/src/baserow/contrib/automation/workflows/handler.py +++ b/backend/src/baserow/contrib/automation/workflows/handler.py @@ -1,16 +1,16 @@ from collections import defaultdict from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional from zipfile import ZipFile from django.conf import settings from django.contrib.auth.models import AbstractUser from django.core.files.storage import Storage -from django.db import IntegrityError +from django.db import IntegrityError, transaction from django.db.models import QuerySet from django.utils import timezone -from celery.canvas import chain +from celery.canvas import Signature, chain from opentelemetry import trace from baserow.contrib.automation.automation_dispatch_context import ( @@ -132,26 +132,14 @@ def _get_published_workflow( return _get_published_workflow(workflow) - def get_original_workflow( - self, workflow: AutomationWorkflow - ) -> Optional[AutomationWorkflow]: - """ - Gets the original workflow related to the provided published - AutomationWorkflow instance. - - If the workflow isn't published but allow_test_run_until is set, - it indicates that the provided workflow is the one being run. Thus the - same workflow is returned. + def _invalidate_workflow_caches(self, workflow: AutomationWorkflow) -> None: + original_workflow = workflow.get_original() - :param workflow: The published workflow for which the original version - should be returned. - :return: The original workflow, if it exists. - """ - - if workflow.automation.published_from_id: - return workflow.automation.published_from - else: - return workflow + global_cache.invalidate(f"wa_published_workflow_{original_workflow.id}") + global_cache.invalidate(self._get_rate_limit_cache_key(original_workflow)) + global_cache.invalidate( + self._get_workflow_history_rate_limit_cache_key(original_workflow) + ) def get_workflows( self, automation: Automation, base_queryset: Optional[QuerySet] = None @@ -674,164 +662,25 @@ def publish( duplicate_automation.published_from = workflow duplicate_automation.save(update_fields=["published_from"]) - return duplicate_automation.workflows.first() - - def before_run(self, workflow: AutomationWorkflow) -> None: - """ - Runs pre-flight checks before a workflow is allowed to run. - - Each check may raise a subclass of the AutomationWorkflowBeforeRunError error. - """ - - # If we don't come from an event, we need to reset the states - self.reset_workflow_temporary_states(workflow) - - self._check_too_many_errors(workflow) - - self._clear_old_history(workflow) - - def _clear_old_history(self, workflow: AutomationWorkflow) -> None: - """ - Clear any old history entries related to the workflow. - - It will delete any history entries that are older than MAX_HISTORY_DAYS and only - keep the most recent MAX_HISTORY_ENTRIES entries. - """ - - oldest_history_date = timezone.now() - timedelta( - days=settings.AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS - ) - workflow.workflow_histories.filter(started_on__lt=oldest_history_date).delete() - - history_ids_to_keep = list( - workflow.workflow_histories.order_by("-started_on").values_list( - "id", flat=True - )[: settings.AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES] - ) - workflow.workflow_histories.exclude(id__in=history_ids_to_keep).delete() - - def _get_rate_limit_cache_key(self, workflow_id: int) -> str: - return WORKFLOW_RATE_LIMIT_CACHE_PREFIX.format(workflow_id) - - def _get_workflow_history_rate_limit_cache_key(self, workflow_id: int) -> str: - return WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX.format(workflow_id) - - def _should_create_rate_limited_workflow_history(self, workflow_id: int) -> bool: - """ - Checks if the workflow history should be created when rate limited. - - Returns True if the history should be created, False otherwise. - """ - - cache_key = self._get_workflow_history_rate_limit_cache_key(workflow_id) - - should_create_history = False - - def _should_create_history(history_exists): - """ - Sets should_create_history to True if this is the first time - we're checking the workflow history in this window. - - Returns True because we always want to set the cache key - when update() is called. - """ - - if not history_exists: - nonlocal should_create_history - should_create_history = True - return True - - global_cache.update( - cache_key, - callback=_should_create_history, - default_value=lambda: False, - timeout=settings.AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS, - ) - - return should_create_history - - def _check_is_rate_limited(self, workflow_id: int) -> None: - """Uses a global cache key to track recent runs for the given workflow.""" - - expiry_seconds = settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS - cache_key = self._get_rate_limit_cache_key(workflow_id) - - global_cache.update( - cache_key, - self._check_is_rate_limited_value, - default_value=lambda: [], - timeout=expiry_seconds, - ) - - def _check_is_rate_limited_value(self, data: List[datetime]) -> List[datetime]: - """ - Given a list of recent workflow run timestamps, determines whether - the workflow run should be rate limited. If so, raises the - AutomationWorkflowRateLimited error. - """ - - now = timezone.now() - expiry_seconds = settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS - start_window = now - timedelta(seconds=expiry_seconds) - - # Check the number of past runs that are in the window - runs_in_window = [ - timestamp - for timestamp in data - if isinstance(timestamp, datetime) and timestamp > start_window - ] - - if len(runs_in_window) >= settings.AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS: - raise AutomationWorkflowRateLimited( - "The workflow was rate limited due to too many recent runs." - ) - - runs_in_window.append(now) - - return runs_in_window - - def _check_too_many_errors(self, workflow: AutomationWorkflow) -> None: - """ - Checks if the given workflow has too many consecutive errors. If so, - raises AutomationWorkflowTooManyErrors. - """ - - max_errors = settings.AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS - - statuses = ( - AutomationWorkflowHistory.objects.filter(workflow=workflow) - .order_by("-started_on") - # +1 because we will ignore the latest entry, since the workflow may - # have just started. - .values_list("status", flat=True)[: max_errors + 1] - ) - - # Ignore the latest status if it is 'started' - if statuses and statuses[0] == HistoryStatusChoices.STARTED: - statuses = statuses[1:] - - # Not enough history to exceed threshold - if len(statuses) < max_errors: - return + self._invalidate_workflow_caches(workflow) - if all(status == HistoryStatusChoices.ERROR for status in statuses): - raise AutomationWorkflowTooManyErrors( - f"The workflow {workflow.id} was disabled due to too " - "many consecutive errors." - ) + return duplicate_automation.workflows.first() def disable_workflow(self, workflow: AutomationWorkflow) -> None: """ - Disable the provided workflow, as well as the original workflow if it exists. + Disable the provided workflow, as well as the original workflow. """ - workflow_ids = {workflow.id} - if original_workflow := self.get_original_workflow(workflow): - workflow_ids.add(original_workflow.id) + original_workflow = workflow.get_original() - AutomationWorkflow.objects.filter(id__in=workflow_ids).update( - state=WorkflowState.DISABLED - ) + # The two workflows are always different because we call it only for published + # workflows + workflow.state = WorkflowState.DISABLED + workflow.save(update_fields=["state"]) + original_workflow.state = WorkflowState.DISABLED + original_workflow.save(update_fields=["state"]) + + automation_workflow_updated.send(self, user=None, workflow=original_workflow) def set_workflow_temporary_states(self, workflow, simulate_until_node=None): """ @@ -882,40 +731,6 @@ def reset_workflow_temporary_states(self, workflow): workflow.save(update_fields=fields_to_save) automation_workflow_updated.send(self, user=None, workflow=workflow) - def async_start_workflow( - self, - workflow: AutomationWorkflow, - event_payload: Optional[List[Dict]] = None, - ) -> None: - """ - Runs the provided workflow in a celery task. - - :param workflow: The AutomationWorkflow ID that should be executed. - :param event_payload: The payload from the action. - """ - - try: - self._check_is_rate_limited(workflow.id) - except AutomationWorkflowRateLimited as e: - if self._should_create_rate_limited_workflow_history(workflow.id): - original_workflow = self.get_original_workflow(workflow) - now = timezone.now() - AutomationHistoryHandler().create_workflow_history( - original_workflow, - is_test_run=original_workflow == workflow, - started_on=now, - completed_on=now, - message=str(e), - status=HistoryStatusChoices.ERROR, - ) - return - - start_workflow_celery_task.delay( - workflow.id, - event_payload, - simulate_until_node_id=workflow.simulate_until_node_id, - ) - def toggle_test_run( self, workflow: AutomationWorkflow, simulate_until_node: bool = None ): @@ -968,62 +783,279 @@ def toggle_test_run( # except if we are updating the trigger sample data by itself self.async_start_workflow(workflow) - def start_workflow( + def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None: + """ + Clear any old history entries related to the workflow. + + It will delete any history entries that are older than MAX_HISTORY_DAYS and only + keep the most recent MAX_HISTORY_ENTRIES entries. + """ + + oldest_history_date = timezone.now() - timedelta( + days=settings.AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS + ) + original_workflow.workflow_histories.exclude( + status=HistoryStatusChoices.STARTED + ).filter(started_on__lt=oldest_history_date).delete() + + history_ids_to_keep = list( + original_workflow.workflow_histories.order_by("-started_on").values_list( + "id", flat=True + )[: settings.AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES] + ) + original_workflow.workflow_histories.exclude( + status=HistoryStatusChoices.STARTED + ).exclude(id__in=history_ids_to_keep).delete() + + def _mark_failure_for_timed_out_history( + self, original_workflow: AutomationWorkflow + ) -> None: + """ + If an history entry is still not finished after a certain duration, this execution + is marked as failed. + """ + + max_history_date = timezone.now() - timedelta( + hours=settings.AUTOMATION_WORKFLOW_TIMEOUT_HOURS + ) + original_workflow.workflow_histories.filter( + status=HistoryStatusChoices.STARTED, started_on__lt=max_history_date + ).update( + status=HistoryStatusChoices.ERROR, + message="This workflow took too long and was timed out.", + ) + + def _get_rate_limit_cache_key(self, original_workflow: AutomationWorkflow) -> str: + return WORKFLOW_RATE_LIMIT_CACHE_PREFIX.format(original_workflow.id) + + def _get_workflow_history_rate_limit_cache_key( + self, original_workflow: AutomationWorkflow + ) -> str: + return WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX.format(original_workflow.id) + + def _get_histories_for_current_workflow_version(self, workflow: AutomationWorkflow): + histories = AutomationHistoryHandler().get_workflow_histories( + workflow.get_original() + ) + + if workflow != workflow.get_original(): + histories = histories.filter(started_on__gte=workflow.created_on) + + return histories + + def _check_is_rate_limited(self, workflow: AutomationWorkflow) -> bool: + """Uses a global cache key to track recent runs for the given workflow.""" + + original_workflow = workflow.get_original() + + cache_key = self._get_rate_limit_cache_key(original_workflow) + rate_cache_timeout = ( + settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS + ) + + now = timezone.now() + + def update_last_run_cache(previous_last_runs): + """ + Given a list of recent workflow run timestamps, determines whether + the workflow run should be rate limited. If so, raises the + AutomationWorkflowRateLimited error. + """ + start_window = now - timedelta( + seconds=settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS + ) + + # Keep only past runs that are in the window + runs_in_window = [ + timestamp + for timestamp in previous_last_runs + if isinstance(timestamp, datetime) and timestamp > start_window + ] + + runs_in_window.append(now) + + return runs_in_window + + runs_in_window = global_cache.update( + cache_key, + update_last_run_cache, + default_value=lambda: [], + timeout=rate_cache_timeout, + ) + + if len(runs_in_window) > settings.AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS: + return True + + started_workflows = ( + self._get_histories_for_current_workflow_version(workflow) + .filter(status=HistoryStatusChoices.STARTED) + .count() + ) + + if started_workflows > settings.AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS: + return True + + return False + + def _check_too_many_errors(self, workflow: AutomationWorkflow) -> bool: + """ + Checks if the given workflow has too many consecutive errors. If so, + raises AutomationWorkflowTooManyErrors. + """ + + original_workflow = workflow.get_original() + + if original_workflow == workflow: + # We don't want to rate limit a test execution or a simulation + return False + + max_errors = settings.AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS + + statuses = ( + self._get_histories_for_current_workflow_version(workflow) + .exclude(status=HistoryStatusChoices.STARTED) + .order_by("-started_on") + .values_list("status", flat=True)[: max_errors + 1] + ) + + return ( + len([s for s in statuses if s == HistoryStatusChoices.ERROR]) > max_errors + ) + + def before_run(self, workflow: AutomationWorkflow) -> None: + """ + Runs pre-flight checks and actions before a workflow is allowed to run. + + Each check may raise a subclass of the AutomationWorkflowBeforeRunError error. + """ + + original_workflow = workflow.get_original() + + # If we don't come from an event, we need to reset the states to prevent + # another execution + self.reset_workflow_temporary_states(original_workflow) + + # If we have history entries that are too old it probably means something + # went wrong with Celery so we mark these entries as failed. + self._mark_failure_for_timed_out_history(original_workflow) + + # We remove old history entries to avoid storing too many entries. + self._clear_old_history(original_workflow) + + if self._check_too_many_errors(workflow): + raise AutomationWorkflowTooManyErrors( + "The workflow was disabled due to too many consecutive errors." + ) + + if self._check_is_rate_limited(workflow): + # Early return if we had too many execution during a short amount of time + raise AutomationWorkflowRateLimited( + "The workflow was rate limited due to too many recent or unfinished " + "runs." + ) + + def async_start_workflow( self, workflow: AutomationWorkflow, - event_payload: Optional[Union[Dict, List[Dict]]], - simulate_until_node_id: Optional[int] = None, + event_payload: Optional[List[Dict]] = None, ) -> None: - """Runs the workflow.""" + """ + Runs the provided workflow in a celery task. - original_workflow = self.get_original_workflow(workflow) + :param workflow: The AutomationWorkflow ID that should be executed. + :param event_payload: The payload from the action. + """ - # If the currently running workflow is an unpublished workflow then we are - # testing it. - is_test_run = original_workflow == workflow + error = None + history_status = HistoryStatusChoices.ERROR + create_history_entry = True + + original_workflow = workflow.get_original() simulate_until_node = ( - workflow.get_graph().get_node(simulate_until_node_id) - if simulate_until_node_id + workflow.get_graph().get_node(workflow.simulate_until_node_id) + if workflow.simulate_until_node_id else None ) - history_handler = AutomationHistoryHandler() - history = history_handler.create_workflow_history( - original_workflow, - started_on=timezone.now(), - is_test_run=is_test_run, - event_payload=event_payload, - simulate_until_node=simulate_until_node, - ) - - error: Optional[str] = None - history_status: Optional[HistoryStatusChoices] = None - try: - self.before_run(original_workflow) + self.before_run(workflow) + except AutomationWorkflowRateLimited as e: + history_cache_timeout = ( + settings.AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS + ) + history_cache_key = self._get_workflow_history_rate_limit_cache_key( + original_workflow + ) + + error = str(e) + # We create an history entry only if we don't have a value in the cache + # It limits the number of created history entry to one every + # AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS seconds + if global_cache.get( + history_cache_key, default=True, timeout=history_cache_timeout + ): + # Set the value to prevent next executions + global_cache.update( + history_cache_key, + lambda v: False, + timeout=history_cache_timeout, + ) + else: + create_history_entry = False except AutomationWorkflowTooManyErrors as e: error = str(e) history_status = HistoryStatusChoices.DISABLED self.disable_workflow(workflow) except AutomationWorkflowBeforeRunError as e: error = str(e) - history_status = HistoryStatusChoices.ERROR + except Exception as e: + error = f"Unknown exception: {str(e)}" + + if error: + if create_history_entry and simulate_until_node is None: + now = timezone.now() - if error is not None and history_status is not None: - history.completed_on = timezone.now() - history.message = error - history.status = history_status - history.save() + AutomationHistoryHandler().create_workflow_history( + original_workflow, + is_test_run=original_workflow == workflow, + started_on=now, + completed_on=now, + message=error, + status=history_status, + ) return + # If the currently running workflow is an unpublished workflow then we are + # testing it. + is_test_run = original_workflow == workflow + + history = AutomationHistoryHandler().create_workflow_history( + original_workflow, + started_on=timezone.now(), + is_test_run=is_test_run, + event_payload=event_payload, + simulate_until_node=simulate_until_node, + ) + + transaction.on_commit( + lambda: start_workflow_celery_task.delay(workflow.id, history.id) + ) + + def start_workflow( + self, + workflow: AutomationWorkflow, + history: AutomationWorkflowHistory, + ) -> Signature: + """Runs the workflow.""" + return chain( dispatch_node_celery_task.si( workflow.get_trigger().id, history.id, ), handle_workflow_dispatch_done.si( - history_id=history.id if not simulate_until_node_id else None, - simulate_until_node_id=simulate_until_node_id, + history_id=history.id, + simulate_until_node_id=history.simulate_until_node_id, ), ) diff --git a/backend/src/baserow/contrib/automation/workflows/models.py b/backend/src/baserow/contrib/automation/workflows/models.py index 908a67e46d..f8ae096e17 100644 --- a/backend/src/baserow/contrib/automation/workflows/models.py +++ b/backend/src/baserow/contrib/automation/workflows/models.py @@ -16,7 +16,6 @@ HierarchicalModelMixin, OrderableMixin, TrashableModelMixin, - WithRegistry, ) if TYPE_CHECKING: @@ -48,7 +47,6 @@ class AutomationWorkflow( TrashableModelMixin, CreatedAndUpdatedOnMixin, OrderableMixin, - WithRegistry, ): automation = models.ForeignKey( "automation.Automation", on_delete=models.CASCADE, related_name="workflows" @@ -94,6 +92,24 @@ def get_last_order(cls, automation: "Automation"): queryset = AutomationWorkflow.objects.filter(automation=automation) return cls.get_highest_order_of_queryset(queryset) + 1 + def is_original(self) -> bool: + """ + Whether this is an original workflow. + """ + return not bool(self.automation.published_from_id) + + def get_original(self) -> "AutomationWorkflow": + """ + Gets the original workflow related to the current instance. + + :return: The original workflow that can be the current instance. + """ + + if self.automation.published_from_id: + return self.automation.published_from + else: + return self + def get_trigger(self) -> "AutomationTriggerNode": """ Returns the first node of the workflow A.K.A the trigger. diff --git a/backend/src/baserow/contrib/automation/workflows/tasks.py b/backend/src/baserow/contrib/automation/workflows/tasks.py index 33a0a3233e..a3749dae9f 100644 --- a/backend/src/baserow/contrib/automation/workflows/tasks.py +++ b/backend/src/baserow/contrib/automation/workflows/tasks.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Union +from typing import Optional from django.utils import timezone @@ -6,6 +6,7 @@ from baserow.config.celery import app from baserow.contrib.automation.history.constants import HistoryStatusChoices +from baserow.contrib.automation.history.handler import AutomationHistoryHandler from baserow.contrib.automation.history.models import AutomationWorkflowHistory from baserow.core.db import atomic_with_retry_on_deadlock @@ -13,18 +14,17 @@ @app.task(queue="automation_workflow") def start_workflow_celery_task( workflow_id: int, - event_payload: Optional[Union[Dict, List[Dict]]], - simulate_until_node_id: Optional[int] = None, + history_id: int, ): from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler @atomic_with_retry_on_deadlock() def _start(): workflow = AutomationWorkflowHandler().get_workflow(workflow_id) + history = AutomationHistoryHandler().get_workflow_history(history_id) return AutomationWorkflowHandler().start_workflow( workflow, - event_payload, - simulate_until_node_id=simulate_until_node_id, + history, ) result = _start() @@ -38,7 +38,7 @@ def _start(): @app.task def handle_workflow_dispatch_done( - history_id: Optional[int] = None, + history_id: int, simulate_until_node_id: Optional[int] = None, ): """ @@ -50,11 +50,12 @@ def handle_workflow_dispatch_done( """ if simulate_until_node_id: + # We just delete the history entry as we don't need it. AutomationWorkflowHistory.objects.filter( - simulate_until_node_id=simulate_until_node_id + id=history_id, simulate_until_node_id=simulate_until_node_id ).delete() - if history_id: + else: # Only update the history if it's still started. # If the workflow history was marked as failed by a specific node, we # don't want to overwrite it. diff --git a/backend/src/baserow/core/cache.py b/backend/src/baserow/core/cache.py index 73da84ecb5..9bf0ff4d40 100644 --- a/backend/src/baserow/core/cache.py +++ b/backend/src/baserow/core/cache.py @@ -207,7 +207,7 @@ def _get_versioned_cache_key( def get( self, key: str, - default: T | Callable[[], T] = None, + default: T | Callable[[], T] | None = None, invalidate_key: None | str = None, timeout: int = 60, ) -> T: @@ -275,7 +275,7 @@ def update( self, key: str, callback: Callable[[T], T], - default_value: T | Callable[[], T] = None, + default_value: T | Callable[[], T] | None = None, invalidate_key: None | str = None, timeout: int = 60, ) -> T: @@ -288,8 +288,8 @@ def update( try: default = default_value() if callable(default_value) else default_value - initial_value = cache.get(cache_key_to_use, default) - new_value = callback(initial_value) + current_cached_value = cache.get(cache_key_to_use, default) + new_value = callback(current_cached_value) cache.set( cache_key_to_use, new_value, diff --git a/backend/src/baserow/core/db.py b/backend/src/baserow/core/db.py index 573d694254..cf1646ca38 100644 --- a/backend/src/baserow/core/db.py +++ b/backend/src/baserow/core/db.py @@ -832,7 +832,7 @@ def atomic_with_retry_on_deadlock( ): """ Decorator that wraps a function in a transaction.atomic block and retries - when deadlock occurswith exponential backoff. + when deadlock occurs with exponential backoff. The decorated function must be idempotent - it should be safe to retry multiple times without changing behavior. Avoid modifying request.data or other mutable diff --git a/backend/tests/baserow/contrib/automation/history/utils.py b/backend/tests/baserow/contrib/automation/history/utils.py index 66b38aeedb..76ebaac38e 100644 --- a/backend/tests/baserow/contrib/automation/history/utils.py +++ b/backend/tests/baserow/contrib/automation/history/utils.py @@ -1,13 +1,15 @@ from baserow.contrib.automation.history.models import AutomationWorkflowHistory -def assert_history(workflow, expected_count, expected_status, expected_msg): +def assert_history( + workflow, expected_count, expected_status, expected_msg, history_index=-1 +): """Helper to test AutomationWorkflowHistory objects.""" - histories = AutomationWorkflowHistory.objects.filter(workflow=workflow) + histories = list(AutomationWorkflowHistory.objects.filter(workflow=workflow)) assert len(histories) == expected_count if expected_count > 0: - history = histories[0] + history = histories[history_index] assert history.workflow == workflow assert history.status == expected_status assert history.message == expected_msg diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py index ec6fb5d815..4b19c00bb9 100644 --- a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py +++ b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py @@ -11,7 +11,6 @@ AutomationWorkflowHistory, ) from baserow.contrib.automation.nodes.handler import AutomationNodeHandler -from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler from baserow.contrib.automation.workflows.tasks import handle_workflow_dispatch_done from baserow.test_utils.helpers import AnyInt, AnyStr @@ -130,7 +129,7 @@ def create_workflow( def create_workflow_history(data_fixture, workflow, trigger_table_fields): - original_workflow = AutomationWorkflowHandler().get_original_workflow(workflow) + original_workflow = workflow.get_original() return data_fixture.create_automation_workflow_history( workflow=original_workflow, event_payload={ @@ -163,9 +162,8 @@ def test_dispatch_node_service_error(data_fixture): data_fixture.create_local_baserow_create_row_action_node( workflow=trigger_node.workflow ) - original_workflow = AutomationWorkflowHandler().get_original_workflow( - trigger_node.workflow - ) + original_workflow = trigger_node.workflow.get_original() + workflow_history = data_fixture.create_automation_workflow_history( workflow=original_workflow ) @@ -417,7 +415,9 @@ def test_dispatch_node_dispatches_trigger_simulation( # Ensure workflow history is deleted, since we don't want history # entries for simulations. - handle_workflow_dispatch_done(simulate_until_node_id=trigger_node.id) + handle_workflow_dispatch_done( + workflow_history.id, simulate_until_node_id=trigger_node.id + ) assert ( AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() is False @@ -530,7 +530,9 @@ def test_dispatch_node_dispatches_action_simulation( # Ensure workflow history is deleted, since we don't want history # entries for simulations. - handle_workflow_dispatch_done(simulate_until_node_id=action_node.id) + handle_workflow_dispatch_done( + workflow_history.id, simulate_until_node_id=action_node.id + ) assert ( AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() is False @@ -590,7 +592,9 @@ def test_dispatch_node_dispatches_iterator_simulation( # No more nodes to dispatch assert result is None - handle_workflow_dispatch_done(simulate_until_node_id=iterator_child_2_node.id) + handle_workflow_dispatch_done( + workflow_history.id, simulate_until_node_id=iterator_child_2_node.id + ) # Make sure the last iterator node simulation saves a history entry iterator_child_2_node.service.specific.refresh_from_db() @@ -933,7 +937,7 @@ def test_dispatch_node_with_advanced_formulas(data_fixture): action_table_model = action_table.get_model() assert action_table_model.objects.count() == 0 - original_workflow = AutomationWorkflowHandler().get_original_workflow(workflow) + original_workflow = workflow.get_original() workflow_history = data_fixture.create_automation_workflow_history( workflow=original_workflow, event_payload={ @@ -1059,7 +1063,7 @@ def test_dispatch_node_dispatches_router_edge_simulation( for node in [trigger_node, router_a, router_b, action_node]: assert node.service.specific.sample_data is None - original_workflow = AutomationWorkflowHandler().get_original_workflow(workflow) + original_workflow = workflow.get_original() workflow_history = data_fixture.create_automation_workflow_history( workflow=original_workflow, event_payload={ @@ -1102,7 +1106,9 @@ def test_dispatch_node_dispatches_router_edge_simulation( ) # Verify workflow history is deleted for simulations - handle_workflow_dispatch_done(simulate_until_node_id=action_node.id) + handle_workflow_dispatch_done( + workflow_history.id, simulate_until_node_id=action_node.id + ) assert ( AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() is False @@ -1162,9 +1168,8 @@ def test_dispatch_node_iterator_with_no_rows(data_fixture): iterator_child_1_node = data["iterator_child_1_node"] # Create workflow history with 0 rows in the event payload. - original_workflow = AutomationWorkflowHandler().get_original_workflow( - trigger_node.workflow - ) + original_workflow = trigger_node.workflow.get_original() + workflow_history = data_fixture.create_automation_workflow_history( workflow=original_workflow, event_payload={ diff --git a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py index 72a0c1648a..d66e839759 100644 --- a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py +++ b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py @@ -27,6 +27,7 @@ AutomationWorkflowTooManyErrors, ) from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler +from baserow.core.cache import global_cache from baserow.core.trash.handler import TrashHandler from tests.baserow.contrib.automation.history.utils import assert_history @@ -486,7 +487,7 @@ def test_get_original_workflow_returns_original_workflow(data_fixture): published_workflow.automation.published_from = original_workflow published_workflow.automation.save() - workflow = AutomationWorkflowHandler().get_original_workflow(published_workflow) + workflow = published_workflow.get_original() assert workflow == original_workflow @@ -508,67 +509,106 @@ def test_trashing_workflow_deletes_published_workflow(data_fixture): assert AutomationWorkflow.objects.filter(id=published_workflow.id).exists() is False -@pytest.mark.parametrize("workflow_id", [10, 100, 200, 300]) -def test_get_rate_limit_cache_key(workflow_id): - result = AutomationWorkflowHandler()._get_rate_limit_cache_key(workflow_id) - assert result == f"automation_workflow_{workflow_id}" - +@pytest.mark.django_db +def test_check_is_rate_limited_returns_none_if_empty_cache(data_fixture): + original_workflow = data_fixture.create_automation_workflow() -def test_check_is_rate_limited_returns_none_if_empty_cache(): with freeze_time("2025-08-01 14:00:00"): - result = AutomationWorkflowHandler()._check_is_rate_limited(100) - assert result is None + result = AutomationWorkflowHandler()._check_is_rate_limited(original_workflow) + assert result is False @override_settings( AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=5, AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=5, ) -def test_check_is_rate_limited_returns_none_if_below_limit(): +@pytest.mark.django_db +def test_check_is_rate_limited_returns_none_if_below_limit(data_fixture): + original_workflow = data_fixture.create_automation_workflow() + with freeze_time("2025-08-01 14:00:00"): for _ in range(4): - result = AutomationWorkflowHandler()._check_is_rate_limited(100) - assert result is None + result = AutomationWorkflowHandler()._check_is_rate_limited( + original_workflow + ) + assert result is False # This 5th attempt shouldn't be rate limited - result = AutomationWorkflowHandler()._check_is_rate_limited(100) - assert result is None + result = AutomationWorkflowHandler()._check_is_rate_limited(original_workflow) + assert result is False @override_settings( AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=5, AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=5, ) -def test_check_is_rate_limited_returns_none_if_cache_expires(): +@pytest.mark.django_db +def test_check_is_rate_limited_returns_none_if_cache_expires(data_fixture): + original_workflow = data_fixture.create_automation_workflow() + with freeze_time("2025-08-01 14:00:00"): for _ in range(5): - result = AutomationWorkflowHandler()._check_is_rate_limited(100) - assert result is None + result = AutomationWorkflowHandler()._check_is_rate_limited( + original_workflow + ) + assert result is False # 6 seconds after the first/initial cache entry with freeze_time("2025-08-01 14:00:06"): # The next 5 requests should not be rate limited for _ in range(5): - result = AutomationWorkflowHandler()._check_is_rate_limited(100) - assert result is None + result = AutomationWorkflowHandler()._check_is_rate_limited( + original_workflow + ) + assert result is False @override_settings( AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=5, AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=5, ) -def test_check_is_rate_limited_raises_if_above_limit(): +@pytest.mark.django_db +def test_check_is_rate_limited_raises_if_above_limit(data_fixture): + original_workflow = data_fixture.create_automation_workflow() + with freeze_time("2025-08-01 14:00:00"): for _ in range(5): - result = AutomationWorkflowHandler()._check_is_rate_limited(100) - assert result is None + result = AutomationWorkflowHandler()._check_is_rate_limited( + original_workflow + ) + assert result is False # This 6th attempt should be rate limited - with pytest.raises(AutomationWorkflowRateLimited) as e: - AutomationWorkflowHandler()._check_is_rate_limited(100) + assert ( + AutomationWorkflowHandler()._check_is_rate_limited(original_workflow) + is True + ) + + +@override_settings( + AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=5, + AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=2, +) +@pytest.mark.django_db +def test_check_is_rate_limited_returns_true_if_too_many_started_workflows( + data_fixture, +): + original_workflow = data_fixture.create_automation_workflow() + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + for _ in range(3): + data_fixture.create_automation_workflow_history( + workflow=original_workflow, status=HistoryStatusChoices.STARTED + ) + + with freeze_time("2025-08-01 14:00:00"): assert ( - str(e.value) == "The workflow was rate limited due to too many recent runs." + AutomationWorkflowHandler()._check_is_rate_limited(published_workflow) + is True ) @@ -579,7 +619,7 @@ def test_check_is_rate_limited_raises_if_above_limit(): ) @patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") def test_workflow_rate_limiter_is_checked_before_starting_celery_task( - mock_celery_task, data_fixture + mock_celery_task, data_fixture, django_capture_on_commit_callbacks ): user = data_fixture.create_user() @@ -591,50 +631,69 @@ def test_workflow_rate_limiter_is_checked_before_starting_celery_task( published_workflow.automation.save() handler = AutomationWorkflowHandler() - rate_limited_error = "The workflow was rate limited due to too many recent runs." + rate_limited_error = ( + "The workflow was rate limited due to too many recent or unfinished runs." + ) with freeze_time("2026-01-26 13:00:00"): - # First 2 calls should queue workflow runs - handler.async_start_workflow(published_workflow) - handler.async_start_workflow(published_workflow) + with django_capture_on_commit_callbacks(execute=True): + # First 2 calls should queue workflow runs + handler.async_start_workflow(published_workflow) + handler.async_start_workflow(published_workflow) assert mock_celery_task.delay.call_count == 2 - assert_history(original_workflow, 0, "error", rate_limited_error) # 3rd call should be rate limited handler.async_start_workflow(published_workflow) assert mock_celery_task.delay.call_count == 2 - assert_history(original_workflow, 1, "error", rate_limited_error) + assert_history( + original_workflow, 3, "error", rate_limited_error, history_index=2 + ) @pytest.mark.django_db @override_settings( AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS=5, ) +@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") @patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") -def test_should_create_rate_limited_workflow_history(mock_celery_task, data_fixture): - workflow_id = 99999 +def test_async_start_workflow_creates_rate_limited_history_once_until_cache_reset( + mock_celery_task, mock_before_run, data_fixture +): + user = data_fixture.create_user() + + original_workflow = data_fixture.create_automation_workflow(user=user) + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE, user=user + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + + mock_before_run.side_effect = AutomationWorkflowRateLimited( + "The workflow was rate limited due to too many recent or unfinished runs." + ) + handler = AutomationWorkflowHandler() with freeze_time("2026-01-26 13:00:00"): - # True because this is the first time we're attempting to create a history - assert handler._should_create_rate_limited_workflow_history(workflow_id) is True + handler.async_start_workflow(published_workflow) + handler.async_start_workflow(published_workflow) + handler.async_start_workflow(published_workflow) - # False because a history already exists within the expiry window - assert ( - handler._should_create_rate_limited_workflow_history(workflow_id) is False - ) - assert ( - handler._should_create_rate_limited_workflow_history(workflow_id) is False - ) + histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + assert histories.count() == 1 + assert_history( + original_workflow, + 1, + "error", + "The workflow was rate limited due to too many recent or unfinished runs.", + ) + mock_celery_task.delay.assert_not_called() with freeze_time("2026-01-26 13:00:06"): - # True because the cache window of 5 seconds has expired - assert handler._should_create_rate_limited_workflow_history(workflow_id) is True + handler.async_start_workflow(published_workflow) - # False because a new history was created via the previous call to the method - assert ( - handler._should_create_rate_limited_workflow_history(workflow_id) is False - ) + histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + assert histories.count() == 2 @pytest.mark.django_db @@ -674,14 +733,21 @@ def test_disable_workflow_disables_published_workflow(data_fixture): @pytest.mark.django_db def test_check_too_many_errors_raises_if_above_limit(data_fixture): original_workflow = data_fixture.create_automation_workflow() + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() - for _ in range(4): + for _ in range(5): data_fixture.create_automation_workflow_history( workflow=original_workflow, status=HistoryStatusChoices.ERROR, ) - AutomationWorkflowHandler()._check_too_many_errors(original_workflow) + assert ( + AutomationWorkflowHandler()._check_too_many_errors(published_workflow) is False + ) # This 6th error should cause True to be returned data_fixture.create_automation_workflow_history( @@ -689,12 +755,42 @@ def test_check_too_many_errors_raises_if_above_limit(data_fixture): status=HistoryStatusChoices.ERROR, ) - with pytest.raises(AutomationWorkflowTooManyErrors) as e: - AutomationWorkflowHandler()._check_too_many_errors(original_workflow) + assert ( + AutomationWorkflowHandler()._check_too_many_errors(published_workflow) is True + ) + + +@override_settings(AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS=5) +@pytest.mark.django_db +def test_check_too_many_errors_does_not_trigger_for_unpublished_workflow(data_fixture): + workflow = data_fixture.create_automation_workflow() - assert str(e.value) == ( - f"The workflow {original_workflow.id} was disabled " - "due to too many consecutive errors." + for _ in range(6): + data_fixture.create_automation_workflow_history( + workflow=workflow, + status=HistoryStatusChoices.ERROR, + ) + + assert AutomationWorkflowHandler()._check_too_many_errors(workflow) is False + + +@override_settings(AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS=5) +@pytest.mark.django_db +def test_check_too_many_errors_ignores_errors_before_latest_publish(data_fixture): + original_workflow = data_fixture.create_automation_workflow() + + with freeze_time("2026-03-10 10:00:00"): + for _ in range(6): + data_fixture.create_automation_workflow_history( + workflow=original_workflow, + status=HistoryStatusChoices.ERROR, + ) + + with freeze_time("2026-03-10 12:00:00"): + published_workflow = AutomationWorkflowHandler().publish(original_workflow) + + assert ( + AutomationWorkflowHandler()._check_too_many_errors(published_workflow) is False ) @@ -880,11 +976,15 @@ def test_clear_old_history_deletes_history_older_than_max_days(data_fixture): workflow = data_fixture.create_automation_workflow() with freeze_time("2025-02-01 12:00:00"): - old_history = data_fixture.create_automation_workflow_history(workflow=workflow) + old_history = data_fixture.create_automation_workflow_history( + workflow=workflow, + status=HistoryStatusChoices.SUCCESS, + ) with freeze_time("2025-02-02 12:00:00"): recent_history = data_fixture.create_automation_workflow_history( - workflow=workflow + workflow=workflow, + status=HistoryStatusChoices.SUCCESS, ) # This is 8 days after old_history was created, so it should be deleted. @@ -906,7 +1006,10 @@ def test_clear_old_history_keeps_only_max_entries(data_fixture): day += i with freeze_time(f"2025-02-{day} 12:00:00"): histories.append( - data_fixture.create_automation_workflow_history(workflow=workflow) + data_fixture.create_automation_workflow_history( + workflow=workflow, + status=HistoryStatusChoices.SUCCESS, + ) ) with freeze_time(f"2025-02-16 12:00:00"): @@ -943,9 +1046,9 @@ def test_clear_old_history_keeps_entries(data_fixture): @pytest.mark.django_db @patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") -@patch("baserow.contrib.automation.nodes.tasks.dispatch_node_celery_task") -def test_start_workflow_too_many_errors( - mock_dispatch_task, mock_before_run, data_fixture +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_too_many_errors( + mock_start_workflow_celery_task, mock_before_run, data_fixture ): mock_before_run.side_effect = AutomationWorkflowTooManyErrors( "mock too many errors" @@ -958,21 +1061,138 @@ def test_start_workflow_too_many_errors( published_workflow.automation.published_from = original_workflow published_workflow.automation.save() - AutomationWorkflowHandler().start_workflow(published_workflow, None, None) + AutomationWorkflowHandler().async_start_workflow(published_workflow) - # Nodes shouldn't be dispatched because before_run() should return early. - mock_dispatch_task.delay.assert_not_called() + # The workflow shouldn't be started because before_run() should return early. + mock_start_workflow_celery_task.delay.assert_not_called() + assert_history(original_workflow, 1, "disabled", "mock too many errors") - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + original_workflow.refresh_from_db() + published_workflow.refresh_from_db() + + assert original_workflow.state == WorkflowState.DISABLED + assert published_workflow.state == WorkflowState.DISABLED + + +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_with_simulate_until_node( + mock_start_workflow_celery_task, data_fixture, django_capture_on_commit_callbacks +): + workflow = data_fixture.create_automation_workflow() + trigger = workflow.get_trigger() + workflow.simulate_until_node = trigger + workflow.save(update_fields=["simulate_until_node"]) + + with django_capture_on_commit_callbacks(execute=True): + AutomationWorkflowHandler().async_start_workflow(workflow) + + workflow.refresh_from_db() + history = workflow.workflow_histories.get() + + assert workflow.simulate_until_node is None + assert history.is_test_run is True + assert history.simulate_until_node_id == trigger.id + + mock_start_workflow_celery_task.delay.assert_called_once_with( + workflow.id, history.id + ) + + +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_with_simulate_until_node_and_error_creates_no_history( + mock_start_workflow_celery_task, mock_before_run, data_fixture +): + workflow = data_fixture.create_automation_workflow() + trigger = workflow.get_trigger() + workflow.simulate_until_node = trigger + workflow.save(update_fields=["simulate_until_node"]) - assert len(histories) == 1 + mock_before_run.side_effect = AutomationWorkflowBeforeRunError("unexpected error") - history = histories[0] - assert history.workflow == original_workflow - assert history.status == HistoryStatusChoices.DISABLED + AutomationWorkflowHandler().async_start_workflow(workflow) - error_msg = "mock too many errors" - assert history.message == error_msg + workflow.refresh_from_db() + + assert workflow.workflow_histories.count() == 0 + mock_start_workflow_celery_task.delay.assert_not_called() + + +@pytest.mark.django_db +@override_settings( + AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=4, + AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS=2, + AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=2, + AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS=2, +) +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_rate_limited_runs_eventually_disable_workflow( + mock_start_workflow_celery_task, data_fixture, django_capture_on_commit_callbacks +): + original_workflow = data_fixture.create_automation_workflow() + with freeze_time("2026-03-08 12:00:00"): + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + + with freeze_time("2026-03-10 12:00:00"): + with django_capture_on_commit_callbacks(execute=True): + # Two regular history entries + AutomationWorkflowHandler().async_start_workflow(published_workflow) + AutomationWorkflowHandler().async_start_workflow(published_workflow) + # only this one is an error + AutomationWorkflowHandler().async_start_workflow(published_workflow) + + assert original_workflow.workflow_histories.count() == 3 + + with freeze_time("2026-03-10 12:00:03"): + # Should create another error + AutomationWorkflowHandler().async_start_workflow(published_workflow) + AutomationWorkflowHandler().async_start_workflow(published_workflow) + AutomationWorkflowHandler().async_start_workflow(published_workflow) + + assert original_workflow.workflow_histories.count() == 4 + + with freeze_time("2026-03-10 12:00:06"): + with django_capture_on_commit_callbacks(execute=True): + AutomationWorkflowHandler().async_start_workflow(published_workflow) + + # Another error is created but we also disable the workflow + AutomationWorkflowHandler().async_start_workflow(published_workflow) + + histories = list( + AutomationWorkflowHistory.objects.filter(workflow=original_workflow).order_by( + "started_on" + ) + ) + + assert len(histories) == 6 + + assert histories[2].status == HistoryStatusChoices.ERROR + assert histories[2].message == ( + "The workflow was rate limited due to too many recent or unfinished runs." + ) + assert histories[3].status == HistoryStatusChoices.ERROR + assert histories[3].message == ( + "The workflow was rate limited due to too many recent or unfinished runs." + ) + + assert histories[4].status == HistoryStatusChoices.ERROR + assert histories[4].message == ( + "The workflow was rate limited due to too many recent or unfinished runs." + ) + + assert histories[5].status == HistoryStatusChoices.DISABLED + assert histories[5].message == ( + "The workflow was disabled due to too many consecutive errors." + ) + + # We should have 6 successful call + assert mock_start_workflow_celery_task.delay.call_count == 2 original_workflow.refresh_from_db() published_workflow.refresh_from_db() @@ -981,11 +1201,64 @@ def test_start_workflow_too_many_errors( assert published_workflow.state == WorkflowState.DISABLED +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.transaction.on_commit") +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_queues_celery_task_on_commit( + mock_start_workflow_celery_task, mock_on_commit, data_fixture +): + workflow = data_fixture.create_automation_workflow() + + AutomationWorkflowHandler().async_start_workflow(workflow) + + history = workflow.workflow_histories.get() + + mock_on_commit.assert_called_once() + mock_start_workflow_celery_task.delay.assert_not_called() + + mock_on_commit.call_args.args[0]() + mock_start_workflow_celery_task.delay.assert_called_once_with( + workflow.id, history.id + ) + + +@pytest.mark.django_db +@override_settings( + AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=30, + AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS=30, + AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS=2, +) +def test_check_is_rate_limited_ignores_runs_before_latest_publish(data_fixture): + original_workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + with freeze_time("2026-03-10 10:00:00"): + for _ in range(3): + data_fixture.create_automation_workflow_history( + workflow=original_workflow, + status=HistoryStatusChoices.STARTED, + ) + global_cache.update( + handler._get_rate_limit_cache_key(original_workflow), + lambda _: [ + datetime.datetime(2026, 3, 10, 10, 0, 0, tzinfo=datetime.timezone.utc), + datetime.datetime(2026, 3, 10, 10, 0, 1, tzinfo=datetime.timezone.utc), + datetime.datetime(2026, 3, 10, 10, 0, 2, tzinfo=datetime.timezone.utc), + ], + default_value=lambda: [], + timeout=30, + ) + + with freeze_time("2026-03-10 12:00:00"): + published_workflow = handler.publish(original_workflow) + assert handler._check_is_rate_limited(published_workflow) is False + + @pytest.mark.django_db @patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") -@patch("baserow.contrib.automation.nodes.tasks.dispatch_node_celery_task") -def test_start_workflow_before_run_error( - mock_dispatch_task, mock_before_run, data_fixture +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_before_run_error( + mock_start_workflow_celery_task, mock_before_run, data_fixture ): # We already test the specific AutomationWorkflowTooManyErrors error above, # but we should also test that before_run() has error handling. @@ -998,24 +1271,150 @@ def test_start_workflow_before_run_error( published_workflow.automation.published_from = original_workflow published_workflow.automation.save() - AutomationWorkflowHandler().start_workflow(published_workflow, None, None) + AutomationWorkflowHandler().async_start_workflow(published_workflow) - # Nodes shouldn't be dispatched because before_run() should return early. - mock_dispatch_task.delay.assert_not_called() + # The workflow shouldn't be started because before_run() should return early. + mock_start_workflow_celery_task.delay.assert_not_called() + assert_history(original_workflow, 1, "error", "unexpected error") - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + original_workflow.refresh_from_db() + published_workflow.refresh_from_db() - assert len(histories) == 1 + assert original_workflow.state == WorkflowState.DRAFT + assert published_workflow.state == WorkflowState.LIVE - history = histories[0] - assert history.workflow == original_workflow - assert history.status == HistoryStatusChoices.ERROR - error_msg = "unexpected error" - assert history.message == error_msg +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_unexpected_error_creates_history( + mock_start_workflow_celery_task, mock_before_run, data_fixture +): + mock_before_run.side_effect = Exception("unexpected error") + + original_workflow = data_fixture.create_automation_workflow() + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + + AutomationWorkflowHandler().async_start_workflow(published_workflow) + + mock_start_workflow_celery_task.delay.assert_not_called() + assert_history(original_workflow, 1, "error", "Unknown exception: unexpected error") + + history = original_workflow.workflow_histories.get() + assert history.started_on is not None + assert history.completed_on == history.started_on original_workflow.refresh_from_db() published_workflow.refresh_from_db() assert original_workflow.state == WorkflowState.DRAFT assert published_workflow.state == WorkflowState.LIVE + + +@override_settings(AUTOMATION_WORKFLOW_TIMEOUT_HOURS=1) +@pytest.mark.django_db +def test_before_run_marks_timed_out_started_history_as_failed(data_fixture): + original_workflow = data_fixture.create_automation_workflow() + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + + with freeze_time("2026-03-10 10:00:00"): + timed_out_history = data_fixture.create_automation_workflow_history( + workflow=original_workflow, + status=HistoryStatusChoices.STARTED, + ) + + with freeze_time("2026-03-10 12:00:00"): + AutomationWorkflowHandler().before_run(published_workflow) + + timed_out_history.refresh_from_db() + + assert timed_out_history.status == HistoryStatusChoices.ERROR + assert timed_out_history.message == "This workflow took too long and was timed out." + + +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_unknown_exception( + mock_start_workflow_celery_task, mock_before_run, data_fixture +): + mock_before_run.side_effect = Exception("unexpected failure") + + original_workflow = data_fixture.create_automation_workflow() + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + + AutomationWorkflowHandler().async_start_workflow(published_workflow) + + mock_start_workflow_celery_task.delay.assert_not_called() + assert_history( + original_workflow, + 1, + "error", + "Unknown exception: unexpected failure", + ) + + +@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=2) +@pytest.mark.django_db +def test_clear_old_history_excludes_started_workflows_max_entries(data_fixture): + workflow = data_fixture.create_automation_workflow() + + # Create three history entries + with freeze_time("2026-03-10 12:00:00"): + started_history = data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.STARTED + ) + + with freeze_time("2026-03-10 13:00:00"): + data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.SUCCESS + ) + + with freeze_time("2026-03-10 14:00:00"): + data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.SUCCESS + ) + + # Although max entries is 2 and the oldest history should be deleted, + # the oldest one is still kept because its status is STARTED. + with freeze_time("2026-03-10 15:00:00"): + AutomationWorkflowHandler()._clear_old_history(workflow) + + assert workflow.workflow_histories.filter(id=started_history.id).exists() is True + assert workflow.workflow_histories.count() == 3 + + +@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=1) +@pytest.mark.django_db +def test_clear_old_history_excludes_started_workflows_max_days(data_fixture): + workflow = data_fixture.create_automation_workflow() + + with freeze_time("2026-03-10 12:00:00"): + history_1 = data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.STARTED + ) + + with freeze_time("2026-03-11 12:00:00"): + history_2 = data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.SUCCESS + ) + + # After 2 days, both history entries are older than MAX_DAYS, but since + # history_1 hasn't finished yet it shouldn't be deleted. + with freeze_time("2026-03-13 12:00:00"): + AutomationWorkflowHandler()._clear_old_history(workflow) + + assert workflow.workflow_histories.filter(id=history_1.id).exists() is True + assert workflow.workflow_histories.filter(id=history_2.id).exists() is False diff --git a/changelog/entries/unreleased/bug/fix_race_condition_in_loop_handling.json b/changelog/entries/unreleased/bug/fix_race_condition_in_loop_handling.json new file mode 100644 index 0000000000..08e830417e --- /dev/null +++ b/changelog/entries/unreleased/bug/fix_race_condition_in_loop_handling.json @@ -0,0 +1,9 @@ +{ + "type": "bug", + "message": "Fix race condition in loop handling", + "issue_origin": "github", + "issue_number": null, + "domain": "automation", + "bullet_points": [], + "created_at": "2026-03-12" +} \ No newline at end of file diff --git a/changelog/entries/unreleased/bug/fixed_a_bug_where_automation_workflow_history_entries_where_.json b/changelog/entries/unreleased/bug/fixed_a_bug_where_automation_workflow_history_entries_where_.json new file mode 100644 index 0000000000..30abca0091 --- /dev/null +++ b/changelog/entries/unreleased/bug/fixed_a_bug_where_automation_workflow_history_entries_where_.json @@ -0,0 +1,9 @@ +{ + "type": "bug", + "message": "Fixed a bug where automation workflow history entries were being deleted for running workflows.", + "issue_origin": "github", + "issue_number": null, + "domain": "automation", + "bullet_points": [], + "created_at": "2026-03-10" +} diff --git a/docker-compose.no-caddy.yml b/docker-compose.no-caddy.yml index f1cc6f4135..d1d7958d0d 100644 --- a/docker-compose.no-caddy.yml +++ b/docker-compose.no-caddy.yml @@ -89,6 +89,7 @@ x-backend-variables: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: @@ -247,6 +248,7 @@ services: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: depends_on: diff --git a/docker-compose.yml b/docker-compose.yml index db47e3eeab..12daf3ef17 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -102,6 +102,7 @@ x-backend-variables: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: @@ -325,6 +326,7 @@ services: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: BASEROW_INTEGRATIONS_PERIODIC_MINUTE_MIN: diff --git a/docs/installation/configuration.md b/docs/installation/configuration.md index aa96df71f0..c0d66d6153 100644 --- a/docs/installation/configuration.md +++ b/docs/installation/configuration.md @@ -202,6 +202,19 @@ The installation methods referred to in the variable descriptions are: | BASEROW\_TWO\_WAY\_SYNC\_MAX\_CONSECUTIVE\_FAILURES | Indicates the maximum number of consecutive two-way data sync updates before deactivating the two-way sync. | 8 | | BASEROW\_TWO\_WAY\_SYNC\_MAX_RETRIES | Indicates the maximum of or two-way data sync create, update, or delete row retries before considering it a failure. | 3 | +### Automation workflow configuration + +| Name | Description | Defaults | +|---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------|----------| +| BASEROW\_AUTOMATION\_HISTORY\_PAGE\_SIZE\_LIMIT | The maximum number of automation history entries returned per page. | 100 | +| BASEROW\_AUTOMATION\_WORKFLOW\_RATE\_LIMIT\_MAX\_RUNS | The maximum number of workflow runs that can be started within the rate limit window before new runs are blocked. | 10 | +| BASEROW\_AUTOMATION\_WORKFLOW\_RATE\_LIMIT\_CACHE\_EXPIRY\_SECONDS | The number of seconds the workflow rate limit counters are retained in cache. | 5 | +| BASEROW\_AUTOMATION\_WORKFLOW\_HISTORY\_RATE\_LIMIT\_CACHE\_EXPIRY\_SECONDS | The number of seconds the workflow history rate limit counters are retained in cache. If unset, it uses the workflow rate limit cache expiry. | 5 | +| BASEROW\_AUTOMATION\_WORKFLOW\_MAX\_CONSECUTIVE\_ERRORS | The maximum number of consecutive workflow errors allowed before the workflow is disabled. | 5 | +| BASEROW\_AUTOMATION\_WORKFLOW\_TIMEOUT\_HOURS | The number of hours after which a running workflow is considered timed out. | 24 | +| BASEROW\_AUTOMATION\_WORKFLOW\_HISTORY\_MAX\_DAYS | The number of days automation workflow history entries are retained. | 30 | +| BASEROW\_AUTOMATION\_WORKFLOW\_HISTORY\_MAX\_ENTRIES | The maximum number of workflow history entries retained per workflow. | 50 | + ### Backend Application Builder Configuration | Name | Description | Defaults | |---------------------------|--------------------------------------------------------------------------------------------------------------------------|------------------------| diff --git a/web-frontend/modules/automation/components/sidebar/SidebarItemAutomation.vue b/web-frontend/modules/automation/components/sidebar/SidebarItemAutomation.vue index 2d80daf121..299badac07 100644 --- a/web-frontend/modules/automation/components/sidebar/SidebarItemAutomation.vue +++ b/web-frontend/modules/automation/components/sidebar/SidebarItemAutomation.vue @@ -1,5 +1,5 @@