From 100e0e1481c5d526a579d8041f8f4bdc5629d99f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9mie=20Pardou?= <571533+jrmi@users.noreply.github.com> Date: Thu, 5 Mar 2026 09:54:11 +0100 Subject: [PATCH 1/2] fix: 401 poping in Sentry (#4910) --- ...or_page_on_token_expiration_to_redirect_to_login.json | 9 +++++++++ web-frontend/modules/core/pages/workspace.vue | 1 - web-frontend/modules/core/plugins/clientHandler.js | 5 ++++- web-frontend/sentry.client.config.ts | 2 +- web-frontend/sentry.server.config.ts | 2 +- 5 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 changelog/entries/unreleased/bug/fix_error_page_on_token_expiration_to_redirect_to_login.json diff --git a/changelog/entries/unreleased/bug/fix_error_page_on_token_expiration_to_redirect_to_login.json b/changelog/entries/unreleased/bug/fix_error_page_on_token_expiration_to_redirect_to_login.json new file mode 100644 index 0000000000..307a53c430 --- /dev/null +++ b/changelog/entries/unreleased/bug/fix_error_page_on_token_expiration_to_redirect_to_login.json @@ -0,0 +1,9 @@ +{ + "type": "bug", + "message": "Fix error page on token expiration to redirect to login", + "issue_origin": "github", + "issue_number": null, + "domain": "core", + "bullet_points": [], + "created_at": "2026-03-04" +} \ No newline at end of file diff --git a/web-frontend/modules/core/pages/workspace.vue b/web-frontend/modules/core/pages/workspace.vue index a1d9ade3cc..740fca211e 100644 --- a/web-frontend/modules/core/pages/workspace.vue +++ b/web-frontend/modules/core/pages/workspace.vue @@ -331,7 +331,6 @@ const { throw createError({ statusCode: 400, message: 'Error loading dashboard.', - fatal: true, }) } } diff --git a/web-frontend/modules/core/plugins/clientHandler.js b/web-frontend/modules/core/plugins/clientHandler.js index 53eed07561..8e1dbbba6a 100644 --- a/web-frontend/modules/core/plugins/clientHandler.js +++ b/web-frontend/modules/core/plugins/clientHandler.js @@ -498,7 +498,10 @@ export function makeErrorResponseInterceptor( // user session expired. Redirect to login page to start a new session. if (rspData?.error === 'ERROR_INVALID_REFRESH_TOKEN') { - nuxtErrorHandler({ statusCode: 401, message: 'User session expired' }) + store.dispatch('auth/forceLogoff') + nuxtErrorHandler( + createError({ statusCode: 401, message: 'User session expired' }) + ) return Promise.reject(error) } diff --git a/web-frontend/sentry.client.config.ts b/web-frontend/sentry.client.config.ts index 89a9f2ef63..5ba99ae665 100644 --- a/web-frontend/sentry.client.config.ts +++ b/web-frontend/sentry.client.config.ts @@ -30,7 +30,7 @@ if (dsn && dsn !== '') { ...(isDev ? { transport: makeFakeTransport } : {}), beforeSend(event, hint) { const err = hint?.originalException - if (err?.fatal === false) return null + if (err?.fatal === false || err?.response?.status === 401) return null if (isDev) { console.error('[Sentry captured error]', `${err}`) return null diff --git a/web-frontend/sentry.server.config.ts b/web-frontend/sentry.server.config.ts index 13ec449a0b..bd6b3b18a5 100644 --- a/web-frontend/sentry.server.config.ts +++ b/web-frontend/sentry.server.config.ts @@ -18,7 +18,7 @@ if (dsn && dsn !== '') { ...(isDev ? { transport: makeFakeTransport } : {}), beforeSend(event, hint) { const err = hint?.originalException - if (err?.fatal === false) return null + if (err?.fatal === false || err?.response?.status === 401) return null if (isDev) { console.error('[Sentry captured error]', err) return null From d48df9eca8c621811ee6f758ded66e6f61d591b3 Mon Sep 17 00:00:00 2001 From: Tsering Paljor Date: Thu, 5 Mar 2026 13:08:15 +0400 Subject: [PATCH 2/2] feat: dispatch Automation Nodes in separate Celery tasks (#4645) * Refactor to support async node dispatch * Store history_id and current_iterations in dispatch context * Add simulate_until_node to AutomationWorkflowHistory * Simplify dispatch: history is always created * CorePeriodicServiceType should do a truthy check, since event_payload is loaded from the DB and can be {} * Make sure current_iterations is considered for next iteration nodes. * Initial async dispatch tests * Simplify test * Add trigger simulation test * Add test for action node simulation * Add remaining async node dispatch tests * Add tests for dispatching async router, update, and delete nodes * Add test for dispatch async with advanced formulas * Remove old test_node_dispatch.py , which is replaced by the async version test_node_dispatch_async.py * Add test for simulating complex router node * Remove unused tests, port complex router simulation test to async module * Fix existing tests since after_dispatch() was removed. * Remove old workflow error history tests, they're covered in test_node_dispatch_async.py * Refactor 'too many errors' handling. * Refactor test; replace after_dispatch() with previous_nodes_results * Add tests for history clean-up * Regenerate migrations * Ensure simulation history is excluded from get_workflow_history() * Add test for before_run() error handling * Add missing bool returns * Add comment * Add changelog * Fix test * Simplify allowed nodes * Add docstring * Make dry * Fix error condition * Improve comment * Add tests for start_workflow() * Remove unnecessary tests * Lint fix * Add specific iterator child dispatch test * Optimize fetching simulate_until_node * Use parent node to find current iteration * Rename dispatch_node_async() to dispatch_node() * Rename _update_previous_results to _load_previous_results * Use Celery Canvas to dispatch nodes with support for parallel branch execution. * WIP - fix tests * Simplify node_dispatch() by always returning a signature. * Refactor tests to account for Celery canvas: test_node_dispatch_async.py * Return self.replace() * Ensure chord callback doesn't overwrite any errors. * Ensure an empty chain is not scheduled. * Ensure start_workflow() uses same pattern as node dispatch task * Fix test * Event payload can be None * Re-add is_test_run for ZDM * Add db_default to new fields * Rename get_workflow_history -> get_workflow_histories(). Refactor get_workflow_history() to return by ID. * Clean up condition * Add explicit else clause with comment * use local_cache to get previous nodes results. * Regenerate migrations * Invalidate the cache after the dispatch() and node results are saved. * Fix tests * Fix test * Make history_id mandatory * Use cached workflow to fetch node * Clear the cache manually in tests * Pass in fake history_id to dispatch context --- .env.example | 2 + backend/src/baserow/config/settings/base.py | 12 + .../contrib/automation/api/workflows/views.py | 2 +- .../automation/automation_dispatch_context.py | 67 +- .../contrib/automation/history/exceptions.py | 17 + .../contrib/automation/history/handler.py | 87 +- .../contrib/automation/history/models.py | 64 +- .../contrib/automation/history/service.py | 4 +- ...nworkflowhistory_event_payload_and_more.py | 64 + .../contrib/automation/nodes/handler.py | 220 ++- .../baserow/contrib/automation/nodes/tasks.py | 39 + .../contrib/automation/workflows/handler.py | 125 +- .../contrib/automation/workflows/tasks.py | 67 +- .../test_utils/fixtures/automation_node.py | 147 ++ .../api/workflows/test_workflow_views.py | 6 +- .../test_data_provider_types.py | 101 +- .../history/test_history_handler.py | 74 +- .../history/test_history_service.py | 10 +- .../automation/nodes/test_node_dispatch.py | 477 ------- .../nodes/test_node_dispatch_async.py | 1190 +++++++++++++++++ .../automation/nodes/test_node_handler.py | 348 +---- .../automation/nodes/test_node_types.py | 12 +- .../workflows/test_workflow_handler.py | 149 +++ .../workflows/test_workflow_tasks.py | 148 -- .../test_slack_write_message_service_type.py | 25 +- ...ching_automation_nodes_asynchronously.json | 9 + docker-compose.no-caddy.yml | 4 + docker-compose.yml | 4 + 28 files changed, 2297 insertions(+), 1177 deletions(-) create mode 100644 backend/src/baserow/contrib/automation/history/exceptions.py create mode 100644 backend/src/baserow/contrib/automation/migrations/0024_automationworkflowhistory_event_payload_and_more.py create mode 100644 backend/src/baserow/contrib/automation/nodes/tasks.py delete mode 100644 backend/tests/baserow/contrib/automation/nodes/test_node_dispatch.py create mode 100644 backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py delete mode 100644 backend/tests/baserow/contrib/automation/workflows/test_workflow_tasks.py create mode 100644 changelog/entries/unreleased/refactor/3838_add_support_for_dispatching_automation_nodes_asynchronously.json diff --git a/.env.example b/.env.example index 53e8a2ee9f..f5a17fcff7 100644 --- a/.env.example +++ b/.env.example @@ -67,6 +67,8 @@ DATABASE_NAME=baserow # BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS= # BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS= # BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS= +# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS= +# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES= # BASEROW_EXTRA_ALLOWED_HOSTS= # ADDITIONAL_APPS= # ADDITIONAL_MODULES= diff --git a/backend/src/baserow/config/settings/base.py b/backend/src/baserow/config/settings/base.py index f470b5f1d8..42305d83fc 100644 --- a/backend/src/baserow/config/settings/base.py +++ b/backend/src/baserow/config/settings/base.py @@ -208,6 +208,12 @@ "CELERY_REDBEAT_LOCK_TIMEOUT", CELERY_BEAT_MAX_LOOP_INTERVAL + 60 ) +CELERY_RESULT_BACKEND = REDIS_URL +CELERY_RESULT_EXPIRES = int( + # default 1 hour + os.getenv("CELERY_RESULT_EXPIRES") or 3600 +) + CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", @@ -819,6 +825,12 @@ def __setitem__(self, key, value): AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS = int( os.getenv("BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS", 5) ) +AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS = int( + os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS", 30) +) +AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES = int( + os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES", 50) +) TRASH_PAGE_SIZE_LIMIT = 200 # How many trash entries can be requested at once. diff --git a/backend/src/baserow/contrib/automation/api/workflows/views.py b/backend/src/baserow/contrib/automation/api/workflows/views.py index e6ade72773..559d5c8600 100644 --- a/backend/src/baserow/contrib/automation/api/workflows/views.py +++ b/backend/src/baserow/contrib/automation/api/workflows/views.py @@ -243,7 +243,7 @@ class AutomationWorkflowHistoryView(APIView): } ) def get(self, request, workflow_id: int): - queryset = AutomationHistoryService().get_workflow_history( + queryset = AutomationHistoryService().get_workflow_histories( request.user, workflow_id ) diff --git a/backend/src/baserow/contrib/automation/automation_dispatch_context.py b/backend/src/baserow/contrib/automation/automation_dispatch_context.py index a6e32ef28b..0d05c87585 100644 --- a/backend/src/baserow/contrib/automation/automation_dispatch_context.py +++ b/backend/src/baserow/contrib/automation/automation_dispatch_context.py @@ -3,22 +3,25 @@ from baserow.contrib.automation.data_providers.registries import ( automation_data_provider_type_registry, ) -from baserow.contrib.automation.nodes.models import AutomationActionNode, AutomationNode +from baserow.contrib.automation.history.models import AutomationNodeResult +from baserow.contrib.automation.nodes.models import AutomationActionNode from baserow.contrib.automation.workflows.models import AutomationWorkflow +from baserow.core.cache import local_cache from baserow.core.services.dispatch_context import DispatchContext from baserow.core.services.models import Service -from baserow.core.services.types import DispatchResult from baserow.core.services.utils import ServiceAdhocRefinements class AutomationDispatchContext(DispatchContext): - own_properties = ["workflow", "event_payload", "simulate_until_node"] + own_properties = ["workflow", "event_payload", "history_id"] def __init__( self, workflow: AutomationWorkflow, + history_id: int, event_payload: Optional[Union[Dict, List[Dict]]] = None, simulate_until_node: Optional[AutomationActionNode] = None, + current_iterations: Optional[Dict[int, int]] = None, ): """ The `DispatchContext` implementation for automations. This context is provided @@ -26,18 +29,25 @@ def __init__( node's changes. :param workflow: The workflow that this dispatch context is associated with. + :param history_id: The AutomationWorkflowHistory ID from which the + workflow's event payload and node results are derived. :param event_payload: The event data from the trigger node, if any was provided, as this is optional. :param simulate_until_node: Stop simulating the dispatch once this node is reached. + :param current_iterations: Used by the Iterator node's children. """ self.workflow = workflow - self.previous_nodes_results: Dict[int, Any] = {} - self.dispatch_history: List[int] = [] + self.history_id = history_id self.simulate_until_node = simulate_until_node self.current_iterations: Dict[int, int] = {} + if current_iterations: + # The keys are strings due to JSON serialization by Celery. We need + # to convert them back to ints. + self.current_iterations = {int(k): v for k, v in current_iterations.items()} + services = ( [self.simulate_until_node.service.specific] if self.simulate_until_node @@ -59,16 +69,40 @@ def __init__( def clone(self, **kwargs): new_context = super().clone(**kwargs) - new_context.previous_nodes_results = {**self.previous_nodes_results} new_context.current_iterations = {**self.current_iterations} - new_context.dispatch_history = list(self.dispatch_history) - return new_context + def _get_previous_results_cache_key(self) -> Optional[str]: + return f"wa_previous_nodes_results_{self.history_id}" + + def _load_previous_results(self) -> Dict[int, Any]: + """ + Returns a dict where keys are the node IDs and values are the results + of the previous_nodes_results. + """ + + results = {} + previous_results = AutomationNodeResult.objects.filter( + node_history__workflow_history_id=self.history_id + ).select_related("node_history__node") + for result in previous_results: + results[result.node_history.node_id] = result.result + + return results + @property def data_provider_registry(self): return automation_data_provider_type_registry + @property + def previous_nodes_results(self) -> Dict[int, Any]: + if cache_key := self._get_previous_results_cache_key(): + return local_cache.get( + cache_key, + lambda: self._load_previous_results(), + ) + return {} + def get_timezone_name(self) -> str: """ TODO: Get the timezone from the application settings. For now, returns @@ -77,23 +111,6 @@ def get_timezone_name(self) -> str: return super().get_timezone_name() - def _register_node_result( - self, node: AutomationNode, dispatch_data: Dict[str, Any] - ): - self.previous_nodes_results[node.id] = dispatch_data - - def after_dispatch(self, node: AutomationNode, dispatch_result: DispatchResult): - """ - This method is called after each node dispatch. It can be used - to perform any final actions or cleanup. - """ - - self.dispatch_history.append(node.id) - self._register_node_result(node, dispatch_result.data) - - def set_current_iteration(self, node, index): - self.current_iterations[node.id] = index - def range(self, service: Service): return [0, None] diff --git a/backend/src/baserow/contrib/automation/history/exceptions.py b/backend/src/baserow/contrib/automation/history/exceptions.py new file mode 100644 index 0000000000..aeacd0b2cd --- /dev/null +++ b/backend/src/baserow/contrib/automation/history/exceptions.py @@ -0,0 +1,17 @@ +from baserow.contrib.automation.exceptions import AutomationError + + +class AutomationWorkflowHistoryError(AutomationError): + pass + + +class AutomationWorkflowHistoryDoesNotExist(AutomationWorkflowHistoryError): + """When the history entry doesn't exist.""" + + def __init__(self, history_id=None, *args, **kwargs): + self.history_id = history_id + super().__init__( + f"The automation workflow history {history_id} does not exist.", + *args, + **kwargs, + ) diff --git a/backend/src/baserow/contrib/automation/history/handler.py b/backend/src/baserow/contrib/automation/history/handler.py index cb3e06a67f..d1bafeea68 100644 --- a/backend/src/baserow/contrib/automation/history/handler.py +++ b/backend/src/baserow/contrib/automation/history/handler.py @@ -1,42 +1,117 @@ from datetime import datetime -from typing import Optional +from typing import Dict, List, Optional, Union from django.db.models import QuerySet from baserow.contrib.automation.history.constants import HistoryStatusChoices -from baserow.contrib.automation.history.models import AutomationWorkflowHistory +from baserow.contrib.automation.history.exceptions import ( + AutomationWorkflowHistoryDoesNotExist, +) +from baserow.contrib.automation.history.models import ( + AutomationNodeHistory, + AutomationNodeResult, + AutomationWorkflowHistory, +) +from baserow.contrib.automation.nodes.models import AutomationNode from baserow.contrib.automation.workflows.models import AutomationWorkflow class AutomationHistoryHandler: - def get_workflow_history( + def get_workflow_histories( self, workflow: AutomationWorkflow, base_queryset: Optional[QuerySet] = None ) -> QuerySet[AutomationWorkflowHistory]: """ Returns all the AutomationWorkflowHistory related to the provided workflow. + + Excludes any simulation histories that haven't yet been deleted. """ if base_queryset is None: base_queryset = AutomationWorkflowHistory.objects.all() - return base_queryset.filter(workflow=workflow).prefetch_related( - "workflow__automation__workspace" - ) + return base_queryset.filter( + workflow=workflow, + simulate_until_node__isnull=True, + ).prefetch_related("workflow__automation__workspace") + + def get_workflow_history( + self, history_id: int, base_queryset: Optional[QuerySet] = None + ) -> AutomationWorkflowHistory: + """ + Returns a AutomationWorkflowHistory by its ID. + + :param history_id: The ID of the AutomationWorkflowHistory. + :param base_queryset: Can be provided to already filter or apply performance + improvements to the queryset when it's being executed. + :raises AutomationWorkflowHistoryDoesNotExist: If the history doesn't exist. + :return: The model instance of the AutomationWorkflowHistory + """ + + if base_queryset is None: + base_queryset = AutomationWorkflowHistory.objects.all() + + try: + return base_queryset.select_related("workflow__automation__workspace").get( + id=history_id + ) + except AutomationWorkflowHistory.DoesNotExist: + raise AutomationWorkflowHistoryDoesNotExist(history_id) def create_workflow_history( self, workflow: AutomationWorkflow, started_on: datetime, is_test_run: bool, + event_payload: Optional[Union[Dict, List[Dict]]] = None, + simulate_until_node: Optional[AutomationNode] = None, status: HistoryStatusChoices = HistoryStatusChoices.STARTED, completed_on: Optional[datetime] = None, message: str = "", ) -> AutomationWorkflowHistory: + """Creates a history entry for a Workflow run.""" + return AutomationWorkflowHistory.objects.create( workflow=workflow, started_on=started_on, is_test_run=is_test_run, + simulate_until_node=simulate_until_node, + event_payload=event_payload, status=status, completed_on=completed_on, message=message, ) + + def create_node_history( + self, + workflow_history: AutomationWorkflowHistory, + node: AutomationNode, + started_on: datetime, + status: HistoryStatusChoices = HistoryStatusChoices.STARTED, + completed_on: Optional[datetime] = None, + message: str = "", + ) -> AutomationNodeHistory: + """Creates a history entry for a Node dispatch.""" + + return AutomationNodeHistory.objects.create( + workflow_history=workflow_history, + node=node, + started_on=started_on, + status=status, + completed_on=completed_on, + message=message, + ) + + def create_node_result( + self, + node_history: AutomationNodeHistory, + result: Optional[Union[Dict, List[Dict]]] = None, + iteration: int = 0, + ) -> AutomationNodeResult: + """Saves the result of a Node dispatch.""" + + result = result if result else {} + return AutomationNodeResult.objects.create( + node_history=node_history, + iteration=iteration, + result=result, + ) diff --git a/backend/src/baserow/contrib/automation/history/models.py b/backend/src/baserow/contrib/automation/history/models.py index 6fcdc6d24c..c88eb6e483 100644 --- a/backend/src/baserow/contrib/automation/history/models.py +++ b/backend/src/baserow/contrib/automation/history/models.py @@ -9,7 +9,9 @@ class AutomationHistory(models.Model): message = models.TextField() - is_test_run = models.BooleanField() + is_test_run = models.BooleanField( + db_default=False + ) # TODO ZDM: Remove after next release status = models.CharField( choices=HistoryStatusChoices.choices, @@ -25,5 +27,63 @@ class AutomationWorkflowHistory(AutomationHistory): workflow = models.ForeignKey( "automation.AutomationWorkflow", on_delete=models.CASCADE, - related_name="workflow_history", + related_name="workflow_histories", ) + simulate_until_node = models.ForeignKey( + "automation.AutomationNode", + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="simulation_histories", + ) + + is_test_run = models.BooleanField( + db_default=False, + help_text="True when the workflow is being simulated.", + ) + + event_payload = models.JSONField( + db_default=None, + null=True, + blank=True, + help_text="Event payload received by the workflow.", + ) + + +class AutomationNodeHistory(AutomationHistory): + workflow_history = models.ForeignKey( + "automation.AutomationWorkflowHistory", + on_delete=models.CASCADE, + related_name="node_histories", + ) + node = models.ForeignKey( + "automation.AutomationNode", + on_delete=models.CASCADE, + related_name="node_histories", + ) + + class Meta: + indexes = [ + models.Index(fields=["workflow_history", "node"]), + ] + + +class AutomationNodeResult(models.Model): + node_history = models.ForeignKey( + "automation.AutomationNodeHistory", + on_delete=models.CASCADE, + related_name="node_results", + ) + + iteration = models.PositiveIntegerField( + db_default=0, + help_text="Keeps track of the current iteration of the Iterator node.", + ) + + result = models.JSONField( + db_default={}, + help_text="Contains node results.", + ) + + class Meta: + unique_together = [["node_history", "iteration"]] diff --git a/backend/src/baserow/contrib/automation/history/service.py b/backend/src/baserow/contrib/automation/history/service.py index c567456247..772ebe3d7f 100644 --- a/backend/src/baserow/contrib/automation/history/service.py +++ b/backend/src/baserow/contrib/automation/history/service.py @@ -15,7 +15,7 @@ def __init__(self): self.handler = AutomationHistoryHandler() self.workflow_handler = AutomationWorkflowHandler() - def get_workflow_history( + def get_workflow_histories( self, user: AbstractUser, workflow_id: int ) -> QuerySet[AutomationWorkflowHistory]: """ @@ -35,4 +35,4 @@ def get_workflow_history( context=workflow, ) - return self.handler.get_workflow_history(workflow) + return self.handler.get_workflow_histories(workflow) diff --git a/backend/src/baserow/contrib/automation/migrations/0024_automationworkflowhistory_event_payload_and_more.py b/backend/src/baserow/contrib/automation/migrations/0024_automationworkflowhistory_event_payload_and_more.py new file mode 100644 index 0000000000..4f25a758e3 --- /dev/null +++ b/backend/src/baserow/contrib/automation/migrations/0024_automationworkflowhistory_event_payload_and_more.py @@ -0,0 +1,64 @@ +# Generated by Django 5.2.11 on 2026-03-04 11:08 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('automation', '0001_squashed_0023_initial'), + ] + + operations = [ + migrations.AddField( + model_name='automationworkflowhistory', + name='event_payload', + field=models.JSONField(blank=True, db_default=None, help_text='Event payload received by the workflow.', null=True), + ), + migrations.AddField( + model_name='automationworkflowhistory', + name='simulate_until_node', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='simulation_histories', to='automation.automationnode'), + ), + migrations.AlterField( + model_name='automationworkflowhistory', + name='is_test_run', + field=models.BooleanField(db_default=False, help_text='True when the workflow is being simulated.'), + ), + migrations.AlterField( + model_name='automationworkflowhistory', + name='workflow', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='workflow_histories', to='automation.automationworkflow'), + ), + migrations.CreateModel( + name='AutomationNodeHistory', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('started_on', models.DateTimeField()), + ('completed_on', models.DateTimeField(blank=True, null=True)), + ('message', models.TextField()), + ('is_test_run', models.BooleanField(db_default=False)), + ('status', models.CharField(choices=[('success', 'Success'), ('error', 'Error'), ('disabled', 'Disabled'), ('started', 'Started')], max_length=8)), + ('node', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='node_histories', to='automation.automationnode')), + ('workflow_history', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='node_histories', to='automation.automationworkflowhistory')), + ], + ), + migrations.CreateModel( + name='AutomationNodeResult', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('iteration', models.PositiveIntegerField(db_default=0, help_text='Keeps track of the current iteration of the Iterator node.')), + ('result', models.JSONField(db_default={}, help_text='Contains node results.')), + ('node_history', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='node_results', to='automation.automationnodehistory')), + ], + ), + migrations.AddIndex( + model_name='automationnodehistory', + index=models.Index(fields=['workflow_history', 'node'], name='automation__workflo_fa3be6_idx'), + ), + migrations.AlterUniqueTogether( + name='automationnoderesult', + unique_together={('node_history', 'iteration')}, + ), + ] diff --git a/backend/src/baserow/contrib/automation/nodes/handler.py b/backend/src/baserow/contrib/automation/nodes/handler.py index ffceb92342..5391089162 100644 --- a/backend/src/baserow/contrib/automation/nodes/handler.py +++ b/backend/src/baserow/contrib/automation/nodes/handler.py @@ -3,7 +3,10 @@ from django.core.files.storage import Storage from django.db.models import QuerySet +from django.utils import timezone +from celery.canvas import Signature, chain, group +from loguru import logger from opentelemetry import trace from baserow.contrib.automation.automation_dispatch_context import ( @@ -11,10 +14,17 @@ ) from baserow.contrib.automation.constants import IMPORT_SERIALIZED_IMPORTING from baserow.contrib.automation.formula_importer import import_formula +from baserow.contrib.automation.history.constants import HistoryStatusChoices +from baserow.contrib.automation.history.exceptions import ( + AutomationWorkflowHistoryDoesNotExist, +) +from baserow.contrib.automation.history.handler import AutomationHistoryHandler +from baserow.contrib.automation.history.models import ( + AutomationNodeHistory, +) from baserow.contrib.automation.models import AutomationWorkflow from baserow.contrib.automation.nodes.exceptions import ( AutomationNodeDoesNotExist, - AutomationNodeMisconfiguredService, ) from baserow.contrib.automation.nodes.models import AutomationNode from baserow.contrib.automation.nodes.node_types import ( @@ -22,6 +32,10 @@ AutomationNodeType, ) from baserow.contrib.automation.nodes.registries import automation_node_type_registry +from baserow.contrib.automation.nodes.signals import automation_node_updated +from baserow.contrib.automation.nodes.tasks import ( + dispatch_node_celery_task, +) from baserow.contrib.automation.nodes.types import AutomationNodeDict from baserow.core.cache import local_cache from baserow.core.db import specific_iterator @@ -349,66 +363,176 @@ def import_node_only( return node_instance + def _handle_workflow_error( + self, + node_history: AutomationNodeHistory, + error: str, + ) -> None: + now = timezone.now() + node_history.workflow_history.completed_on = now + node_history.workflow_history.message = error + node_history.workflow_history.status = HistoryStatusChoices.ERROR + node_history.workflow_history.save() + + node_history.completed_on = now + node_history.message = error + node_history.status = HistoryStatusChoices.ERROR + node_history.save() + def dispatch_node( self, - node: "AutomationNode", - dispatch_context: AutomationDispatchContext, - allowed_nodes=None, - ): + node_id: int, + history_id: int, + current_iterations: Optional[Dict[int, int]] = None, + ) -> Signature | None: """ - Dispatch one node and recursively dispatch the next nodes. - - :param node: The node to start with. - :param dispatch_context: The context in which the workflow is being dispatched, - which contains the event payload and other relevant data. - :param allowed_nodes: if set, only the nodes from the list will be dispatched. + Dispatch a single node and return a canvas for the next nodes. + + :param node_id: The node to dispatch. + :param history_id: The AutomationWorkflowHistory ID from which the + workflow's event payload and node results are derived. + :param current_iterations: Used by the Iterator node's children. + :return result: A signature is returned if there is a next node to + dispatch, otherwise returns None. """ - if dispatch_context.simulate_until_node and allowed_nodes is None: + from baserow.contrib.automation.workflows.handler import ( + AutomationWorkflowHandler, + ) + + history_handler = AutomationHistoryHandler() + + try: + workflow_history = history_handler.get_workflow_history( + history_id=history_id + ) + except AutomationWorkflowHistoryDoesNotExist as e: + logger.error(str(e)) + return None + + node = self.get_node(node_id) + simulate_until_node = ( + node.workflow.get_graph().get_node(workflow_history.simulate_until_node_id) + if workflow_history.simulate_until_node_id + else None + ) + + if simulate_until_node: allowed_nodes = { - *dispatch_context.simulate_until_node.get_previous_nodes(), - dispatch_context.simulate_until_node, + *simulate_until_node.get_previous_nodes(), + simulate_until_node, } + if node not in allowed_nodes: + # Return early as the node is not in the path leading to + # the simulated node. + return None + + node_history = history_handler.create_node_history( + workflow_history=workflow_history, + node=node, + started_on=timezone.now(), + ) - if allowed_nodes is not None and node not in allowed_nodes: - # Return early as the node is not on the path until the simulated node - return + dispatch_context = AutomationDispatchContext( + node.workflow, + history_id, + event_payload=workflow_history.event_payload, + simulate_until_node=workflow_history.simulate_until_node, + current_iterations=current_iterations, + ) node_type: Type[AutomationNodeActionNodeType] = node.get_type() + try: dispatch_result = node_type.dispatch(node, dispatch_context) - dispatch_context.after_dispatch(node, dispatch_result) - - # Return early if this is a simulated dispatch - if until_node := dispatch_context.simulate_until_node: - if until_node.id == node.id: - return - - if children := node.get_children(): - node_data = dispatch_result.data["results"] - - if dispatch_context.simulate_until_node: - iterations = [0] - else: - iterations = range(len(node_data)) + except ServiceImproperlyConfiguredDispatchException as e: + error = f"The node {node.id} is misconfigured and cannot be dispatched. {str(e)}" + self._handle_workflow_error(node_history, error) + return None + except Exception as e: + original_workflow = AutomationWorkflowHandler().get_original_workflow( + node.workflow + ) + error = ( + f"Unexpected error while running workflow {original_workflow.id}. " + f"Error: {str(e)}" + ) + logger.exception(error) + self._handle_workflow_error(node_history, error) + return None + + iteration_index = 0 + parent_nodes = node.get_parent_nodes() + if parent_nodes: + # Use the normalized iteration index from the context. + iteration_index = dispatch_context.current_iterations[parent_nodes[-1].id] + + history_handler.create_node_result( + node_history=node_history, + result=dispatch_result.data, + iteration=iteration_index, + ) - for index in iterations: - sub_dispatch_context = dispatch_context.clone() - sub_dispatch_context.set_current_iteration(node, index) + # Return early if this is a simulation as we've reached the + # simulated node. + if until_node := simulate_until_node: + if until_node.id == node.id: + until_node.service.specific.refresh_from_db(fields=["sample_data"]) + automation_node_updated.send(self, user=None, node=until_node) + return None + + to_chain = [] + if children := node.get_children(): + node_data = dispatch_result.data["results"] + + # For simulations, we only need the first iteration. + if simulate_until_node: + iterations = [0] + else: + iterations = range(len(node_data)) + + groups_to_chain = [] + for index in iterations: + child_iterations = { + **dispatch_context.current_iterations, + node.id: index, + } + groups_to_chain.append( + group( + [ + dispatch_node_celery_task.si( + c.id, history_id, child_iterations + ) + for c in children + ] + ), + ) - # dispatch context build - for child in children: - self.dispatch_node( - child, sub_dispatch_context, allowed_nodes=allowed_nodes + if groups_to_chain: + canvas = chain(*groups_to_chain) + to_chain.append(canvas) + + now = timezone.now() + node_history.completed_on = now + node_history.status = HistoryStatusChoices.SUCCESS + node_history.save() + + # Handle non-iterator nodes, including iterator children. + next_nodes = node.get_next_nodes(dispatch_result.output_uid) + if next_nodes: + to_chain.append( + group( + [ + dispatch_node_celery_task.si( + n.id, history_id, current_iterations ) + for n in next_nodes + ] + ), + ) - next_nodes = node.get_next_nodes(dispatch_result.output_uid) - - for next_node in next_nodes: - self.dispatch_node( - next_node, dispatch_context, allowed_nodes=allowed_nodes - ) - except ServiceImproperlyConfiguredDispatchException as e: - raise AutomationNodeMisconfiguredService( - f"The node {node.id} is misconfigured and cannot be dispatched. {str(e)}" - ) from e + if to_chain: + return chain(*to_chain) + else: + # This is the end of this branch + return None diff --git a/backend/src/baserow/contrib/automation/nodes/tasks.py b/backend/src/baserow/contrib/automation/nodes/tasks.py new file mode 100644 index 0000000000..0c80c29171 --- /dev/null +++ b/backend/src/baserow/contrib/automation/nodes/tasks.py @@ -0,0 +1,39 @@ +from typing import Dict, Optional + +from celery.canvas import Signature + +from baserow.config.celery import app +from baserow.core.db import atomic_with_retry_on_deadlock + + +@app.task(bind=True, queue="automation_workflow") +def dispatch_node_celery_task( + self, + node_id: int, + history_id: int, + current_iterations: Optional[Dict[int, int]] = None, +): + from baserow.contrib.automation.nodes.handler import AutomationNodeHandler + + # The atomic context should only wrap the dispatch_node() call. If + # it also wraps `self.replace()`, which internally raises `Ignore`, + # the rollback will cause the node result to not be persisted. + @atomic_with_retry_on_deadlock() + def _dispatch(): + return AutomationNodeHandler().dispatch_node( + node_id, + history_id, + current_iterations=current_iterations, + ) + + result = _dispatch() + + # When result is a Signature (chord, group, etc), it represents the next + # node that needs to be dispatched as an async task. + # + # We call `self.replace()` which internally calls `.delay()` then + # raises `Ignore` to signal to Celery that the current task should be + # replaced. This results in the signature (next node) to be picked up + # by a worker (which again calls dispatch_node_celery_task). + if isinstance(result, Signature): + return self.replace(result) diff --git a/backend/src/baserow/contrib/automation/workflows/handler.py b/backend/src/baserow/contrib/automation/workflows/handler.py index fe5c1e9d86..899cb29e17 100644 --- a/backend/src/baserow/contrib/automation/workflows/handler.py +++ b/backend/src/baserow/contrib/automation/workflows/handler.py @@ -10,7 +10,7 @@ from django.db.models import QuerySet from django.utils import timezone -from loguru import logger +from celery.canvas import chain from opentelemetry import trace from baserow.contrib.automation.automation_dispatch_context import ( @@ -24,8 +24,10 @@ from baserow.contrib.automation.history.handler import AutomationHistoryHandler from baserow.contrib.automation.history.models import AutomationWorkflowHistory from baserow.contrib.automation.models import Automation +from baserow.contrib.automation.nodes.handler import AutomationNodeHandler from baserow.contrib.automation.nodes.models import AutomationNode from baserow.contrib.automation.nodes.signals import automation_node_updated +from baserow.contrib.automation.nodes.tasks import dispatch_node_celery_task from baserow.contrib.automation.nodes.types import AutomationNodeDict from baserow.contrib.automation.types import AutomationWorkflowDict from baserow.contrib.automation.workflows.constants import ( @@ -42,13 +44,15 @@ ) from baserow.contrib.automation.workflows.models import AutomationWorkflow from baserow.contrib.automation.workflows.signals import automation_workflow_updated -from baserow.contrib.automation.workflows.tasks import start_workflow_celery_task +from baserow.contrib.automation.workflows.tasks import ( + handle_workflow_dispatch_done, + start_workflow_celery_task, +) from baserow.contrib.automation.workflows.types import UpdatedAutomationWorkflow from baserow.core.cache import global_cache, local_cache from baserow.core.exceptions import IdDoesNotExist from baserow.core.psycopg import is_unique_violation_error from baserow.core.registries import ImportExportConfig -from baserow.core.services.exceptions import DispatchException from baserow.core.storage import ExportZipFile, get_default_storage from baserow.core.telemetry.utils import baserow_trace_methods from baserow.core.trash.handler import TrashHandler @@ -387,8 +391,6 @@ def export_workflow( :return: The serialized version. """ - from baserow.contrib.automation.nodes.handler import AutomationNodeHandler - serialized_nodes = [ AutomationNodeHandler().export_node( n, files_zip=files_zip, storage=storage, cache=cache @@ -442,8 +444,6 @@ def import_nodes( :return: A list of the newly created nodes. """ - from baserow.contrib.automation.nodes.handler import AutomationNodeHandler - imported_nodes = [] imported_nodes = AutomationNodeHandler().import_nodes( @@ -688,6 +688,28 @@ def before_run(self, workflow: AutomationWorkflow) -> None: self._check_too_many_errors(workflow) + self._clear_old_history(workflow) + + def _clear_old_history(self, workflow: AutomationWorkflow) -> None: + """ + Clear any old history entries related to the workflow. + + It will delete any history entries that are older than MAX_HISTORY_DAYS and only + keep the most recent MAX_HISTORY_ENTRIES entries. + """ + + oldest_history_date = timezone.now() - timedelta( + days=settings.AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS + ) + workflow.workflow_histories.filter(started_on__lt=oldest_history_date).delete() + + history_ids_to_keep = list( + workflow.workflow_histories.order_by("-started_on").values_list( + "id", flat=True + )[: settings.AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES] + ) + workflow.workflow_histories.exclude(id__in=history_ids_to_keep).delete() + def _get_rate_limit_cache_key(self, workflow_id: int) -> str: return WORKFLOW_RATE_LIMIT_CACHE_PREFIX.format(workflow_id) @@ -928,7 +950,10 @@ def toggle_test_run( dispatch_context = AutomationDispatchContext( workflow, - None, + # This is a placeholder value, no actual history exists yet + # (it's created later in start_workflow). This is fine + # for now, because get_sample_data() doesn't use history. + history_id=0, simulate_until_node=simulate_until_node, ) if workflow.can_immediately_be_tested() or ( @@ -945,72 +970,60 @@ def toggle_test_run( def start_workflow( self, - workflow: int, + workflow: AutomationWorkflow, event_payload: Optional[Union[Dict, List[Dict]]], - simulate_until_node: Optional[int] = None, + simulate_until_node_id: Optional[int] = None, ) -> None: """Runs the workflow.""" - from baserow.contrib.automation.nodes.handler import AutomationNodeHandler - original_workflow = self.get_original_workflow(workflow) # If the currently running workflow is an unpublished workflow then we are # testing it. is_test_run = original_workflow == workflow - is_simulation = simulate_until_node is not None - - dispatch_context = AutomationDispatchContext( - workflow, - event_payload, - simulate_until_node=simulate_until_node, + simulate_until_node = ( + workflow.get_graph().get_node(simulate_until_node_id) + if simulate_until_node_id + else None ) - start_time = timezone.now() - history_handler = AutomationHistoryHandler() + history = history_handler.create_workflow_history( + original_workflow, + started_on=timezone.now(), + is_test_run=is_test_run, + event_payload=event_payload, + simulate_until_node=simulate_until_node, + ) - if not is_simulation: - # No history stored in simulation, we want to populate the node sample data - history = history_handler.create_workflow_history( - original_workflow, - started_on=start_time, - is_test_run=is_test_run, - ) + error: Optional[str] = None + history_status: Optional[HistoryStatusChoices] = None try: self.before_run(original_workflow) - AutomationNodeHandler().dispatch_node( - workflow.get_trigger(), dispatch_context - ) except AutomationWorkflowTooManyErrors as e: - history_message = str(e) + error = str(e) history_status = HistoryStatusChoices.DISABLED self.disable_workflow(workflow) - except (DispatchException, AutomationWorkflowBeforeRunError) as e: - history_message = str(e) + except AutomationWorkflowBeforeRunError as e: + error = str(e) history_status = HistoryStatusChoices.ERROR - except Exception as e: - history_message = ( - f"Unexpected error while running workflow {original_workflow.id}. " - f"Error: {str(e)}" - ) - history_status = HistoryStatusChoices.ERROR - logger.exception(history_message) - else: - history_message = "" - history_status = HistoryStatusChoices.SUCCESS - finally: - if not is_simulation: - history.completed_on = timezone.now() - history.message = history_message - history.status = history_status - history.save() - else: - # sample_data was updated as it's a simulation we should tell to - # the frontend - simulate_until_node.service.specific.refresh_from_db( - fields=["sample_data"] - ) - automation_node_updated.send(self, user=None, node=simulate_until_node) + + if error is not None and history_status is not None: + history.completed_on = timezone.now() + history.message = error + history.status = history_status + history.save() + return + + return chain( + dispatch_node_celery_task.si( + workflow.get_trigger().id, + history.id, + ), + handle_workflow_dispatch_done.si( + history_id=history.id if not simulate_until_node_id else None, + simulate_until_node_id=simulate_until_node_id, + ), + ) diff --git a/backend/src/baserow/contrib/automation/workflows/tasks.py b/backend/src/baserow/contrib/automation/workflows/tasks.py index bd27c5cdab..33a0a3233e 100644 --- a/backend/src/baserow/contrib/automation/workflows/tasks.py +++ b/backend/src/baserow/contrib/automation/workflows/tasks.py @@ -1,30 +1,67 @@ from typing import Dict, List, Optional, Union +from django.utils import timezone + +from celery.canvas import Signature + from baserow.config.celery import app +from baserow.contrib.automation.history.constants import HistoryStatusChoices +from baserow.contrib.automation.history.models import AutomationWorkflowHistory from baserow.core.db import atomic_with_retry_on_deadlock -@app.task(bind=True, queue="automation_workflow") -@atomic_with_retry_on_deadlock() +@app.task(queue="automation_workflow") def start_workflow_celery_task( - self, workflow_id: int, event_payload: Optional[Union[Dict, List[Dict]]], simulate_until_node_id: Optional[int] = None, ): - from baserow.contrib.automation.nodes.handler import AutomationNodeHandler from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler - workflow = AutomationWorkflowHandler().get_workflow(workflow_id) + @atomic_with_retry_on_deadlock() + def _start(): + workflow = AutomationWorkflowHandler().get_workflow(workflow_id) + return AutomationWorkflowHandler().start_workflow( + workflow, + event_payload, + simulate_until_node_id=simulate_until_node_id, + ) + + result = _start() + + if isinstance(result, Signature): + # Schedule the workflow to be executed. We use delay() here instead of + # replace() because replace internally calls `result.get()` which isn't + # allowed in eager mode (which is used by tests). + result.delay() + + +@app.task +def handle_workflow_dispatch_done( + history_id: Optional[int] = None, + simulate_until_node_id: Optional[int] = None, +): + """ + Hook for any post-workflow dispatch handling. + + If history_id is provided, the workflow's history is updated to 'success'. + + If simulate_until_node_id is provided, the related workflow history is deleted. + """ - simulate_until_node = ( - AutomationNodeHandler().get_node(simulate_until_node_id) - if simulate_until_node_id - else None - ) + if simulate_until_node_id: + AutomationWorkflowHistory.objects.filter( + simulate_until_node_id=simulate_until_node_id + ).delete() - AutomationWorkflowHandler().start_workflow( - workflow, - event_payload, - simulate_until_node=simulate_until_node, - ) + if history_id: + # Only update the history if it's still started. + # If the workflow history was marked as failed by a specific node, we + # don't want to overwrite it. + AutomationWorkflowHistory.objects.filter( + id=history_id, + status=HistoryStatusChoices.STARTED, + ).update( + status=HistoryStatusChoices.SUCCESS, + completed_on=timezone.now(), + ) diff --git a/backend/src/baserow/test_utils/fixtures/automation_node.py b/backend/src/baserow/test_utils/fixtures/automation_node.py index 92c9053d19..24d470e516 100644 --- a/backend/src/baserow/test_utils/fixtures/automation_node.py +++ b/backend/src/baserow/test_utils/fixtures/automation_node.py @@ -19,6 +19,7 @@ LocalBaserowUpdateRowNodeType, ) from baserow.contrib.automation.nodes.registries import automation_node_type_registry +from baserow.contrib.automation.workflows.constants import WorkflowState from baserow.contrib.integrations.core.models import CoreRouterServiceEdge from baserow.core.cache import local_cache from baserow.core.services.registries import service_type_registry @@ -181,3 +182,149 @@ def create_http_trigger_node(self, user=None, **kwargs): type=CoreHTTPTriggerNodeType.type, **kwargs, ) + + def iterator_graph_fixture(self, create_after_iteration_node: bool = True): + """ + Fixture that creates the following graph: + - trigger_node + - iterator_node + - iterator_child_1 + - iterator_child_2 + - after_iteration_node + + trigger sample data are + [ + {"Name": "Apple", "Color": "Red"}, + {"Name": "Banana", "Color": "Yellow"}, + ] + """ + + user = self.create_user() + + trigger_table, trigger_table_fields, _ = self.build_table( + user=user, + columns=[("Name", "text"), ("Color", "text")], + rows=[], + ) + iterator_child_1_table, iterator_child_1_table_fields, _ = self.build_table( + user=user, + columns=[("Name", "text")], + rows=[], + ) + iterator_child_2_table, iterator_child_2_table_fields, _ = self.build_table( + user=user, + columns=[("Name", "text")], + rows=[], + ) + after_iteration_table, after_iteration_table_fields, _ = self.build_table( + user=user, + columns=[("Name", "text")], + rows=[], + ) + + integration = self.create_local_baserow_integration(user=user) + + workflow = self.create_automation_workflow( + user=user, + state=WorkflowState.LIVE, + trigger_type="local_baserow_rows_created", + trigger_service_kwargs={ + "table": trigger_table, + "integration": integration, + "sample_data": { + "data": { + "results": [ + { + trigger_table_fields[0].name: "Apple", + trigger_table_fields[1].name: "Red", + }, + { + trigger_table_fields[0].name: "Banana", + trigger_table_fields[1].name: "Yellow", + }, + ] + } + }, + }, + ) + + trigger = workflow.get_trigger() + + iterator_node = self.create_core_iterator_action_node( + workflow=workflow, + reference_node=trigger, + position="south", + output="", + service_kwargs={ + "source": f'get("previous_node.{trigger.id}")', + "integration": integration, + }, + ) + + iterator_child_1_node = self.create_local_baserow_create_row_action_node( + workflow=workflow, + reference_node=iterator_node, + position="child", + output="", + label="First iterator child", + service_kwargs={ + "table": iterator_child_1_table, + "integration": integration, + }, + ) + iterator_child_1_node.service.specific.field_mappings.create( + field=iterator_child_1_table_fields[0], + value=f'get("current_iteration.{iterator_node.id}.item.{trigger_table_fields[0].name}")', + ) + + iterator_child_2_node = self.create_local_baserow_create_row_action_node( + workflow=workflow, + reference_node=iterator_child_1_node, + position="south", + output="", + label="Second iterator child", + service_kwargs={ + "table": iterator_child_2_table, + "integration": integration, + }, + ) + iterator_child_2_node.service.specific.field_mappings.create( + field=iterator_child_2_table_fields[0], + value=f'get("current_iteration.{iterator_node.id}.item.{trigger_table_fields[1].name}")', + ) + + if create_after_iteration_node: + after_iteration_node = self.create_local_baserow_create_row_action_node( + workflow=workflow, + reference_node=iterator_node, + position="south", + output="", + label="After iterator", + service_kwargs={ + "table": after_iteration_table, + "integration": integration, + }, + ) + after_iteration_node.service.specific.field_mappings.create( + field=after_iteration_table_fields[0], + value=f'get("previous_node.{iterator_node.id}.*.{trigger_table_fields[0].name}")', + ) + else: + after_iteration_node = None + + return { + "workflow": workflow, + "trigger_node": trigger, + "trigger_table": trigger_table, + "trigger_table_fields": trigger_table_fields, + "iterator_node": iterator_node, + "iterator_child_1_node": iterator_child_1_node, + "iterator_child_1_table": iterator_child_1_table, + "iterator_child_1_table_fields": iterator_child_1_table_fields, + "iterator_child_2_node": iterator_child_2_node, + "iterator_child_2_table": iterator_child_2_table, + "iterator_child_2_table_fields": iterator_child_2_table_fields, + "after_iteration_node": after_iteration_node, + "after_iteration_table": after_iteration_table, + "after_iteration_table_fields": after_iteration_table_fields, + } diff --git a/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py b/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py index 37db535afc..7ad854d98e 100644 --- a/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py +++ b/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py @@ -541,7 +541,7 @@ def test_publish_workflow_error_invalid_workflow(api_client, data_fixture): @pytest.mark.django_db -def test_get_workflow_history(api_client, data_fixture): +def test_get_workflow_histories(api_client, data_fixture): user, token = data_fixture.create_user_and_token() history = data_fixture.create_workflow_history(user=user) @@ -567,7 +567,7 @@ def test_get_workflow_history(api_client, data_fixture): @pytest.mark.django_db -def test_get_workflow_history_invalid_workflow(api_client, data_fixture): +def test_get_workflow_histories_invalid_workflow(api_client, data_fixture): user, token = data_fixture.create_user_and_token() url = reverse(API_URL_WORKFLOW_HISTORY, kwargs={"workflow_id": 99999}) @@ -581,7 +581,7 @@ def test_get_workflow_history_invalid_workflow(api_client, data_fixture): @pytest.mark.django_db -def test_get_workflow_history_permission_error(api_client, data_fixture): +def test_get_workflow_histories_permission_error(api_client, data_fixture): user, _ = data_fixture.create_user_and_token() history = data_fixture.create_workflow_history(user=user) diff --git a/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py b/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py index 8ad804c4b2..cb99c4bed4 100644 --- a/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py +++ b/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py @@ -1,3 +1,5 @@ +from django.utils import timezone + import pytest from baserow.contrib.automation.automation_dispatch_context import ( @@ -7,29 +9,51 @@ CurrentIterationDataProviderType, PreviousNodeProviderType, ) +from baserow.contrib.automation.history.handler import AutomationHistoryHandler from baserow.core.formula.exceptions import InvalidFormulaContext -from baserow.core.services.types import DispatchResult @pytest.mark.django_db def test_previous_node_data_provider_get_data_chunk(data_fixture): - user = data_fixture.create_user() - workflow = data_fixture.create_automation_workflow(user=user) + workflow = data_fixture.create_automation_workflow() + workflow_history = AutomationHistoryHandler().create_workflow_history( + workflow, + timezone.now(), + False, + ) + trigger = workflow.get_trigger() + trigger_node_history = AutomationHistoryHandler().create_node_history( + workflow_history=workflow_history, + node=trigger, + started_on=timezone.now(), + ) + AutomationHistoryHandler().create_node_result( + node_history=trigger_node_history, + result={"results": [{"field_1": "Horse"}]}, + ) + first_action = data_fixture.create_local_baserow_create_row_action_node( workflow=workflow, ) + first_action_node_history = AutomationHistoryHandler().create_node_history( + workflow_history=workflow_history, + node=first_action, + started_on=timezone.now(), + ) + AutomationHistoryHandler().create_node_result( + node_history=first_action_node_history, + result={"field_2": "Badger"}, + ) + data_fixture.create_local_baserow_create_row_action_node( workflow=workflow, ) - dispatch_context = AutomationDispatchContext(workflow) - - dispatch_context.after_dispatch( - trigger, DispatchResult(data={"results": [{"field_1": "Horse"}]}) - ) - dispatch_context.after_dispatch( - first_action, DispatchResult(data={"field_2": "Badger"}) + dispatch_context = AutomationDispatchContext( + workflow, + workflow_history.id, + event_payload=workflow_history.event_payload, ) # `first_action` referencing the trigger input data. @@ -53,11 +77,26 @@ def test_previous_node_data_provider_get_data_chunk(data_fixture): PreviousNodeProviderType().get_data_chunk(dispatch_context, ["999", "field_3"]) assert exc.value.args[0] == "The previous node doesn't exist" - dispatch_context = AutomationDispatchContext(workflow) + workflow_history_2 = AutomationHistoryHandler().create_workflow_history( + workflow, + timezone.now(), + False, + ) + trigger_node_history_2 = AutomationHistoryHandler().create_node_history( + workflow_history=workflow_history_2, + node=trigger, + started_on=timezone.now(), + ) + AutomationHistoryHandler().create_node_result( + node_history=trigger_node_history_2, + result={"results": [{"field_1": "Horse"}]}, + ) - dispatch_context.after_dispatch( - trigger, DispatchResult(data={"results": [{"field_1": "Horse"}]}) + dispatch_context = AutomationDispatchContext( + workflow, + workflow_history_2.id, ) + # Existing node but after with pytest.raises(InvalidFormulaContext) as exc: PreviousNodeProviderType().get_data_chunk( @@ -91,29 +130,35 @@ def test_previous_node_data_provider_import_path(data_fixture): @pytest.mark.django_db def test_current_iteration_data_provider_get_data_chunk(data_fixture): - user = data_fixture.create_user() - workflow = data_fixture.create_automation_workflow(user=user) - trigger = workflow.get_trigger() + workflow = data_fixture.create_automation_workflow() + workflow_history = AutomationHistoryHandler().create_workflow_history( + workflow, + timezone.now(), + False, + ) iterator = data_fixture.create_core_iterator_action_node( workflow=workflow, ) data_fixture.create_local_baserow_create_row_action_node( workflow=workflow, ) - - dispatch_context = AutomationDispatchContext(workflow) - - dispatch_context.after_dispatch( - trigger, - DispatchResult(data={"results": [{"field_1": "Horse"}, {"field_1": "Duck"}]}), + node_history = AutomationHistoryHandler().create_node_history( + workflow_history=workflow_history, + node=iterator, + started_on=timezone.now(), ) - - dispatch_context.after_dispatch( - iterator, - DispatchResult(data={"results": [{"field_1": "Horse"}, {"field_1": "Duck"}]}), + AutomationHistoryHandler().create_node_result( + node_history=node_history, + result={"results": [{"field_1": "Horse"}, {"field_1": "Duck"}]}, + iteration=0, ) - dispatch_context.set_current_iteration(iterator, 0) + dispatch_context = AutomationDispatchContext( + workflow, + workflow_history.id, + event_payload=workflow_history.event_payload, + current_iterations={iterator.id: 0}, + ) assert ( CurrentIterationDataProviderType().get_data_chunk( @@ -122,7 +167,7 @@ def test_current_iteration_data_provider_get_data_chunk(data_fixture): == "Horse" ) - dispatch_context.set_current_iteration(iterator, 1) + dispatch_context.current_iterations[iterator.id] = 1 assert ( CurrentIterationDataProviderType().get_data_chunk( diff --git a/backend/tests/baserow/contrib/automation/history/test_history_handler.py b/backend/tests/baserow/contrib/automation/history/test_history_handler.py index b4b9e2f781..1238f2645a 100644 --- a/backend/tests/baserow/contrib/automation/history/test_history_handler.py +++ b/backend/tests/baserow/contrib/automation/history/test_history_handler.py @@ -2,26 +2,29 @@ import pytest +from baserow.contrib.automation.history.exceptions import ( + AutomationWorkflowHistoryDoesNotExist, +) from baserow.contrib.automation.history.handler import AutomationHistoryHandler from baserow.contrib.automation.history.models import AutomationWorkflowHistory from baserow.contrib.automation.workflows.constants import WorkflowState @pytest.mark.django_db -def test_get_workflow_history_no_base_queryset(data_fixture): +def test_get_workflow_histories_no_base_queryset(data_fixture): workflow = data_fixture.create_automation_workflow() - result = AutomationHistoryHandler().get_workflow_history(workflow) + result = AutomationHistoryHandler().get_workflow_histories(workflow) # Should return an empty queryset, since this workflow has no history assert list(result) == [] @pytest.mark.django_db -def test_get_workflow_history_with_base_queryset(data_fixture): +def test_get_workflow_histories_with_base_queryset(data_fixture): workflow = data_fixture.create_automation_workflow() - result = AutomationHistoryHandler().get_workflow_history( + result = AutomationHistoryHandler().get_workflow_histories( workflow, AutomationWorkflowHistory.objects.all() ) @@ -30,7 +33,7 @@ def test_get_workflow_history_with_base_queryset(data_fixture): @pytest.mark.django_db -def test_get_workflow_history_returns_ordered_histories(data_fixture): +def test_get_workflow_histories_returns_ordered_histories(data_fixture): original_workflow = data_fixture.create_automation_workflow() history_1 = data_fixture.create_workflow_history( original_workflow=original_workflow @@ -39,7 +42,7 @@ def test_get_workflow_history_returns_ordered_histories(data_fixture): original_workflow=original_workflow ) - result = AutomationHistoryHandler().get_workflow_history(original_workflow) + result = AutomationHistoryHandler().get_workflow_histories(original_workflow) # Ensure latest is returned first assert list(result) == [history_2, history_1] @@ -63,3 +66,62 @@ def test_create_workflow_history(data_fixture): assert isinstance(history, AutomationWorkflowHistory) assert history.workflow == original_workflow + + +@pytest.mark.django_db +def test_get_workflow_histories_excludes_simulation_histories(data_fixture): + """ + Simulation histories are deleted by the dispatch_node() once the final + node is dispatched. However, we want to ensure they're excluded so that + a user doesn't accidentally see them while the simulation is running. + """ + + workflow = data_fixture.create_automation_workflow() + trigger = workflow.get_trigger() + + simulation_history = data_fixture.create_automation_workflow_history( + workflow=workflow, + simulate_until_node=trigger, + ) + + result = AutomationHistoryHandler().get_workflow_histories(workflow) + + assert len(result) == 0 + + +@pytest.mark.django_db +def test_get_workflow_history(data_fixture): + workflow = data_fixture.create_automation_workflow() + history = AutomationHistoryHandler().create_workflow_history( + workflow, + timezone.now(), + False, + ) + + result = AutomationHistoryHandler().get_workflow_history(history_id=history.id) + + assert result == history + + +@pytest.mark.django_db +def test_get_workflow_history_does_not_exist(data_fixture): + with pytest.raises(AutomationWorkflowHistoryDoesNotExist) as e: + AutomationHistoryHandler().get_workflow_history(history_id=100) + + assert str(e.value) == "The automation workflow history 100 does not exist." + + +@pytest.mark.django_db +def test_get_workflow_history_respects_base_queryset(data_fixture): + workflow = data_fixture.create_automation_workflow() + history = AutomationHistoryHandler().create_workflow_history( + workflow, + timezone.now(), + False, + ) + + with pytest.raises(AutomationWorkflowHistoryDoesNotExist) as e: + AutomationHistoryHandler().get_workflow_history( + history_id=history.id, + base_queryset=AutomationWorkflowHistory.objects.exclude(id=history.id), + ) diff --git a/backend/tests/baserow/contrib/automation/history/test_history_service.py b/backend/tests/baserow/contrib/automation/history/test_history_service.py index f6ea29284f..cc6276b921 100644 --- a/backend/tests/baserow/contrib/automation/history/test_history_service.py +++ b/backend/tests/baserow/contrib/automation/history/test_history_service.py @@ -5,7 +5,7 @@ @pytest.mark.django_db -def test_get_workflow_history_permission_error(data_fixture): +def test_get_workflow_histories_permission_error(data_fixture): user = data_fixture.create_user() history = data_fixture.create_workflow_history(user=user) @@ -13,7 +13,7 @@ def test_get_workflow_history_permission_error(data_fixture): user_2 = data_fixture.create_user() with pytest.raises(UserNotInWorkspace) as e: - AutomationHistoryService().get_workflow_history(user_2, history.workflow.id) + AutomationHistoryService().get_workflow_histories(user_2, history.workflow.id) assert str(e.value) == ( f"User {user_2.email} doesn't belong to " @@ -22,7 +22,7 @@ def test_get_workflow_history_permission_error(data_fixture): @pytest.mark.django_db -def test_get_workflow_history_returns_ordered_histories(data_fixture): +def test_get_workflow_histories_returns_ordered_histories(data_fixture): user = data_fixture.create_user() original_workflow = data_fixture.create_automation_workflow(user=user) @@ -33,6 +33,8 @@ def test_get_workflow_history_returns_ordered_histories(data_fixture): original_workflow=original_workflow ) - result = AutomationHistoryService().get_workflow_history(user, original_workflow.id) + result = AutomationHistoryService().get_workflow_histories( + user, original_workflow.id + ) assert list(result) == [history_2, history_1] diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch.py b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch.py deleted file mode 100644 index d3ef034cbe..0000000000 --- a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch.py +++ /dev/null @@ -1,477 +0,0 @@ -import pytest - -from baserow.contrib.automation.automation_dispatch_context import ( - AutomationDispatchContext, -) -from baserow.contrib.automation.nodes.handler import AutomationNodeHandler -from baserow.contrib.automation.workflows.constants import WorkflowState -from baserow.contrib.database.rows.handler import RowHandler - - -@pytest.mark.django_db -def test_run_workflow_with_create_row_action(data_fixture): - user = data_fixture.create_user() - workspace = data_fixture.create_workspace(user=user) - integration = data_fixture.create_local_baserow_integration(user=user) - database = data_fixture.create_database_application(workspace=workspace) - trigger_table = data_fixture.create_database_table(database=database) - action_table = data_fixture.create_database_table(database=database) - action_table_field = data_fixture.create_text_field(table=action_table) - workflow = data_fixture.create_automation_workflow(user) - trigger = workflow.get_trigger() - trigger_service = trigger.service.specific - trigger_service.table = trigger_table - trigger_service.integration = integration - trigger_service.save() - action_node = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - service=data_fixture.create_local_baserow_upsert_row_service( - table=action_table, - integration=integration, - ), - ) - action_node.service.field_mappings.create(field=action_table_field, value="'Horse'") - - action_table_model = action_table.get_model() - assert action_table_model.objects.count() == 0 - - dispatch_context = AutomationDispatchContext(workflow, {}) - AutomationNodeHandler().dispatch_node(workflow.get_trigger(), dispatch_context) - - row = action_table_model.objects.first() - assert getattr(row, action_table_field.db_column) == "Horse" - assert dispatch_context.dispatch_history == [trigger.id, action_node.id] - - -@pytest.mark.django_db(transaction=True) -def test_run_workflow_with_create_row_action_and_advanced_formula(data_fixture): - user = data_fixture.create_user() - workspace = data_fixture.create_workspace(user=user) - integration = data_fixture.create_local_baserow_integration(user=user) - database = data_fixture.create_database_application(workspace=workspace) - - trigger_table, trigger_table_fields, _ = data_fixture.build_table( - user=user, - columns=[ - ("Food", "text"), - ("Spiciness", "number"), - ], - rows=[ - ["Paneer Tikka", 5], - ["Gobi Manchurian", 8], - ], - ) - - action_table, action_table_fields, action_rows = data_fixture.build_table( - database=database, - user=user, - columns=[("Name", "text")], - rows=[], - ) - workflow = data_fixture.create_automation_workflow(user, state="live") - trigger = workflow.get_trigger() - trigger_service = trigger.service.specific - trigger_service.table = trigger_table - trigger_service.integration = integration - trigger_service.save() - action_node = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - service=data_fixture.create_local_baserow_upsert_row_service( - table=action_table, - integration=integration, - ), - ) - action_node.service.field_mappings.create( - field=action_table_fields[0], - value=f"concat('The comparaison is ', " - f"get('previous_node.{trigger.id}.0.{trigger_table_fields[1].db_column}') > 7)", - ) - - action_table_model = action_table.get_model() - assert action_table_model.objects.count() == 0 - - # Triggers a row creation - RowHandler().create_rows( - user=user, - table=trigger_table, - model=trigger_table.get_model(), - rows_values=[ - { - trigger_table_fields[0].db_column: "Spice", - trigger_table_fields[1].db_column: "4.14", - }, - ], - skip_search_update=True, - ) - - row = action_table_model.objects.first() - assert getattr(row, action_table_fields[0].db_column) == "The comparaison is false" - - -@pytest.mark.django_db -def test_run_workflow_with_update_row_action(data_fixture): - user = data_fixture.create_user() - workspace = data_fixture.create_workspace(user=user) - integration = data_fixture.create_local_baserow_integration(user=user) - database = data_fixture.create_database_application(workspace=workspace) - trigger_table = data_fixture.create_database_table(database=database) - action_table = data_fixture.create_database_table(database=database) - action_table_field = data_fixture.create_text_field(table=action_table) - action_table_row = action_table.get_model().objects.create( - **{f"field_{action_table_field.id}": "Horse"} - ) - workflow = data_fixture.create_automation_workflow(user) - trigger = workflow.get_trigger() - trigger_service = trigger.service.specific - trigger_service.table = trigger_table - trigger_service.integration = integration - trigger_service.save() - action_node = data_fixture.create_local_baserow_update_row_action_node( - workflow=workflow, - service=data_fixture.create_local_baserow_upsert_row_service( - table=action_table, - integration=integration, - row_id=action_table_row.id, - ), - ) - action_node.service.field_mappings.create( - field=action_table_field, value="'Badger'" - ) - - dispatch_context = AutomationDispatchContext(workflow, {}) - AutomationNodeHandler().dispatch_node(workflow.get_trigger(), dispatch_context) - - action_table_row.refresh_from_db() - assert getattr(action_table_row, action_table_field.db_column) == "Badger" - assert dispatch_context.dispatch_history == [trigger.id, action_node.id] - - -@pytest.mark.django_db -def test_run_workflow_with_delete_row_action(data_fixture): - user = data_fixture.create_user() - workspace = data_fixture.create_workspace(user=user) - integration = data_fixture.create_local_baserow_integration(user=user) - database = data_fixture.create_database_application(workspace=workspace) - trigger_table = data_fixture.create_database_table(database=database) - action_table = data_fixture.create_database_table(database=database) - action_table_field = data_fixture.create_text_field(table=action_table) - action_table_row = action_table.get_model().objects.create( - **{f"field_{action_table_field.id}": "Mouse"} - ) - workflow = data_fixture.create_automation_workflow( - user=user, state=WorkflowState.LIVE - ) - trigger = workflow.get_trigger() - trigger_service = trigger.service.specific - trigger_service.table = trigger_table - trigger_service.integration = integration - trigger_service.save() - action_node = data_fixture.create_local_baserow_delete_row_action_node( - workflow=workflow, - service=data_fixture.create_local_baserow_delete_row_service( - table=action_table, - integration=integration, - row_id=action_table_row.id, - ), - ) - - assert action_table.get_model().objects.all().count() == 1 - - dispatch_context = AutomationDispatchContext(workflow, {}) - AutomationNodeHandler().dispatch_node(workflow.get_trigger(), dispatch_context) - - assert action_table.get_model().objects.all().count() == 0 - assert dispatch_context.dispatch_history == [trigger.id, action_node.id] - - -@pytest.mark.django_db -def test_run_workflow_with_router_action(data_fixture): - user = data_fixture.create_user() - workspace = data_fixture.create_workspace(user=user) - integration = data_fixture.create_local_baserow_integration(user=user) - database = data_fixture.create_database_application(workspace=workspace) - trigger_table = data_fixture.create_database_table(database=database) - workflow = data_fixture.create_automation_workflow( - user=user, - state=WorkflowState.LIVE, - trigger_service_kwargs={"table": trigger_table, "integration": integration}, - ) - - trigger = workflow.get_trigger() - - router_node = data_fixture.create_core_router_action_node( - workflow=workflow, - ) - - data_fixture.create_core_router_service_edge( - service=router_node.service, label="Edge 1", condition="'false'" - ) - - action_table = data_fixture.create_database_table(database=database) - action_table_field = data_fixture.create_text_field(table=action_table) - action_table_row = action_table.get_model().objects.create( - **{f"field_{action_table_field.id}": "Horse"} - ) - edge2 = data_fixture.create_core_router_service_edge( - service=router_node.service, - label="Edge 2", - condition="'true'", - skip_output_node=True, - ) - edge2_output_node = data_fixture.create_local_baserow_update_row_action_node( - workflow=workflow, - reference_node=router_node, - position="south", - output=edge2.uid, - service_kwargs={ - "table": action_table, - "integration": integration, - "row_id": action_table_row.id, - }, - ) - edge2_output_node.service.field_mappings.create( - field=action_table_field, value="'Badger'" - ) - - workflow.assert_reference( - { - "0": "local_baserow_rows_created", - "local_baserow_rows_created": {"next": {"": ["router"]}}, - "router": { - "next": { - "Edge 1": ["Edge 1 output node"], - "Edge 2": ["local_baserow_update_row"], - } - }, - "Edge 1 output node": {}, - "local_baserow_update_row": {}, - } - ) - - dispatch_context = AutomationDispatchContext(workflow, {}) - AutomationNodeHandler().dispatch_node(workflow.get_trigger(), dispatch_context) - - action_table_row.refresh_from_db() - assert getattr(action_table_row, action_table_field.db_column) == "Badger" - assert dispatch_context.dispatch_history == [ - trigger.id, - router_node.id, - edge2_output_node.id, - ] - - -@pytest.fixture -def iterator_graph_fixture(data_fixture): - """ - Fixture that creates the following graph: - rows_created -> iterator [ -> create_row -> create_row3 ] -> create_row2 - - trigger sample data are - [ - {"field_1": "value 1", "field_2": "other 1"}, - {"field_1": "value 2", "field_2": "other 2"}, - ] - """ - - user = data_fixture.create_user() - - trigger_table, trigger_table_fields, _ = data_fixture.build_table( - user=user, - columns=[("Name", "text")], - rows=[], - ) - - action_table, action_table_fields, _ = data_fixture.build_table( - user=user, - columns=[("Name", "text")], - rows=[], - ) - action2_table, action2_table_fields, _ = data_fixture.build_table( - user=user, - columns=[("Name", "text")], - rows=[], - ) - action3_table, action3_table_fields, _ = data_fixture.build_table( - user=user, - columns=[("Name", "text")], - rows=[], - ) - - integration = data_fixture.create_local_baserow_integration(user=user) - - workflow = data_fixture.create_automation_workflow( - user=user, - state=WorkflowState.LIVE, - trigger_type="local_baserow_rows_created", - trigger_service_kwargs={ - "table": trigger_table, - "integration": integration, - "sample_data": { - "data": { - "results": [ - {"field_1": "value 1", "field_2": "other 1"}, - {"field_1": "value 2", "field_2": "other 2"}, - ] - } - }, - }, - ) - - trigger = workflow.get_trigger() - - iterator_node = data_fixture.create_core_iterator_action_node( - workflow=workflow, - reference_node=trigger, - position="south", - output="", - service_kwargs={ - "source": f'get("previous_node.{trigger.id}")', - "integration": integration, - }, - ) - - action_node = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - reference_node=iterator_node, - position="child", - output="", - label="First action", - service_kwargs={"table": action_table, "integration": integration}, - ) - action_node.service.specific.field_mappings.create( - field=action_table_fields[0], - value=f'get("current_iteration.{iterator_node.id}.item.field_1")', - ) - - action2_node = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - reference_node=iterator_node, - position="south", - output="", - label="After iterator", - service_kwargs={"table": action2_table, "integration": integration}, - ) - action2_node.service.specific.field_mappings.create( - field=action2_table_fields[0], - value=f'get("previous_node.{iterator_node.id}.*.field_1")', - ) - - action3_node = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - reference_node=action_node, - position="south", - output="", - label="Second action", - service_kwargs={"table": action3_table, "integration": integration}, - ) - action3_node.service.specific.field_mappings.create( - field=action3_table_fields[0], - value=f'get("current_iteration.{iterator_node.id}.item.field_2")', - ) - - return { - "workflow": workflow, - "action_node": action_node, - "action_table": action_table, - "action_table_fields": action_table_fields, - "action2_table": action2_table, - "action2_table_fields": action2_table_fields, - "action3_table": action3_table, - "action3_table_fields": action3_table_fields, - } - - -@pytest.mark.django_db -def test_run_workflow_with_iterator_action(iterator_graph_fixture): - workflow = iterator_graph_fixture["workflow"] - action_table = iterator_graph_fixture["action_table"] - action_table_fields = iterator_graph_fixture["action_table_fields"] - action2_table = iterator_graph_fixture["action2_table"] - action2_table_fields = iterator_graph_fixture["action2_table_fields"] - action3_table = iterator_graph_fixture["action3_table"] - action3_table_fields = iterator_graph_fixture["action3_table_fields"] - - workflow.assert_reference( - { - "0": "local_baserow_rows_created", - "local_baserow_rows_created": {"next": {"": ["iterator"]}}, - "iterator": { - "children": ["First action"], - "next": {"": ["After iterator"]}, - }, - "First action": {"next": {"": ["Second action"]}}, - "Second action": {}, - "After iterator": {}, - } - ) - - dispatch_context = AutomationDispatchContext( - workflow, - { - "results": [ - {"field_1": "value 1", "field_2": "other 1"}, - {"field_1": "value 2", "field_2": "other 2"}, - ] - }, - ) - - AutomationNodeHandler().dispatch_node(workflow.get_trigger(), dispatch_context) - - # At this point all node should have been executed - rows = list(action_table.get_model().objects.all()) - assert len(rows) == 2 - - assert getattr(rows[0], action_table_fields[0].db_column) == "value 1" - assert getattr(rows[1], action_table_fields[0].db_column) == "value 2" - - rows2 = list(action2_table.get_model().objects.all()) - assert len(rows2) == 1 - assert getattr(rows2[0], action2_table_fields[0].db_column) == "value 1,value 2" - - rows3 = list(action3_table.get_model().objects.all()) - assert len(rows3) == 2 - - assert getattr(rows3[0], action3_table_fields[0].db_column) == "other 1" - assert getattr(rows3[1], action3_table_fields[0].db_column) == "other 2" - - -@pytest.mark.django_db -def test_run_workflow_with_iterator_action_simulate(iterator_graph_fixture): - workflow = iterator_graph_fixture["workflow"] - action_node = iterator_graph_fixture["action_node"] - action_table = iterator_graph_fixture["action_table"] - action_table_fields = iterator_graph_fixture["action_table_fields"] - action2_table = iterator_graph_fixture["action2_table"] - action3_table = iterator_graph_fixture["action3_table"] - - workflow.assert_reference( - { - "0": "local_baserow_rows_created", - "local_baserow_rows_created": {"next": {"": ["iterator"]}}, - "iterator": { - "children": ["First action"], - "next": {"": ["After iterator"]}, - }, - "First action": {"next": {"": ["Second action"]}}, - "Second action": {}, - "After iterator": {}, - } - ) - - dispatch_context = AutomationDispatchContext( - workflow, - simulate_until_node=action_node, - ) - AutomationNodeHandler().dispatch_node(workflow.get_trigger(), dispatch_context) - - # At this point only nodes until the action_node should have been executed - rows = list(action_table.get_model().objects.all()) - assert len(rows) == 1 - - assert getattr(rows[0], action_table_fields[0].db_column) == "value 1" - - rows2 = list(action2_table.get_model().objects.all()) - assert len(rows2) == 0 - - rows3 = list(action3_table.get_model().objects.all()) - assert len(rows3) == 0 diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py new file mode 100644 index 0000000000..ec6fb5d815 --- /dev/null +++ b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py @@ -0,0 +1,1190 @@ +from unittest.mock import ANY, patch + +import pytest +from celery.canvas import Signature + +from baserow.config.celery import clear_local +from baserow.contrib.automation.history.constants import HistoryStatusChoices +from baserow.contrib.automation.history.models import ( + AutomationNodeHistory, + AutomationNodeResult, + AutomationWorkflowHistory, +) +from baserow.contrib.automation.nodes.handler import AutomationNodeHandler +from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler +from baserow.contrib.automation.workflows.tasks import handle_workflow_dispatch_done +from baserow.test_utils.helpers import AnyInt, AnyStr + +TRIGGER_NODE_TYPE_PATH = ( + "baserow.contrib.automation.nodes.node_types.LocalBaserowRowsCreatedNodeTriggerType" +) +NODE_HANDLER_PATH = "baserow.contrib.automation.nodes.handler" + + +def assert_dispatches_next_node(result, *expected_tasks): + """ + Helper to assert that the correct signature is returned. + + expected_tasks are tuples, e.g.: (node, history, iterations) + """ + + assert isinstance(result, Signature) + assert len(result.tasks) == len(expected_tasks) + for i, (node, history, iterations) in enumerate(expected_tasks): + task = result.tasks[i] + if hasattr(task, "tasks"): + assert len(task.tasks) == 1 + task = task.tasks[0] + assert task.args == (node.id, history.id, iterations) + + +def create_workflow( + data_fixture, + user=None, + action_node_type="create_row", + action_node_service_value=None, +): + if user is None: + user = data_fixture.create_user() + + workspace = data_fixture.create_workspace(user=user) + integration = data_fixture.create_local_baserow_integration(user=user) + database = data_fixture.create_database_application(workspace=workspace) + trigger_table = data_fixture.create_database_table(database=database) + trigger_table_field_a = data_fixture.create_text_field(table=trigger_table) + trigger_table_field_b = data_fixture.create_text_field(table=trigger_table) + action_table = data_fixture.create_database_table(database=database) + action_table_field = data_fixture.create_text_field(table=action_table) + + workflow = data_fixture.create_automation_workflow( + user, trigger_type="local_baserow_rows_created" + ) + trigger = workflow.get_trigger() + trigger_service = trigger.service.specific + trigger_service.table = trigger_table + trigger_service.integration = integration + trigger_service.save() + + action_table_row = None + + if action_node_type == "create_row": + action_node = data_fixture.create_local_baserow_create_row_action_node( + workflow=workflow, + previous_node=trigger, + service=data_fixture.create_local_baserow_upsert_row_service( + table=action_table, + integration=integration, + ), + ) + elif action_node_type == "update_row" and action_node_service_value: + action_table_row = action_table.get_model().objects.create( + **{f"field_{action_table_field.id}": action_node_service_value} + ) + action_node = data_fixture.create_local_baserow_update_row_action_node( + workflow=workflow, + previous_node=trigger, + service=data_fixture.create_local_baserow_upsert_row_service( + table=action_table, + integration=integration, + row_id=action_table_row.id, + ), + ) + elif action_node_type == "delete_row": + action_table_row = action_table.get_model().objects.create( + **{f"field_{action_table_field.id}": action_node_service_value} + ) + action_node = data_fixture.create_local_baserow_delete_row_action_node( + workflow=workflow, + previous_node=trigger, + service=data_fixture.create_local_baserow_delete_row_service( + table=action_table, + integration=integration, + row_id=action_table_row.id, + ), + ) + + if action_node_type in ("create_row", "update_row"): + action_node.service.field_mappings.create( + field=action_table_field, + value=f"get('previous_node.{trigger.id}.0.{trigger_table_field_a.db_column}')", + ) + + history = create_workflow_history( + data_fixture, workflow, [trigger_table_field_a, trigger_table_field_b] + ) + + return { + "user": "user", + "integration": integration, + "workflow": workflow, + "trigger_node": trigger, + "action_node": action_node, + "workflow_history": history, + "action_table": action_table, + "action_table_field": action_table_field, + "action_table_row": action_table_row, + "trigger_table": trigger_table, + "trigger_table_field_a": trigger_table_field_a, + "trigger_table_field_b": trigger_table_field_b, + } + + +def create_workflow_history(data_fixture, workflow, trigger_table_fields): + original_workflow = AutomationWorkflowHandler().get_original_workflow(workflow) + return data_fixture.create_automation_workflow_history( + workflow=original_workflow, + event_payload={ + "results": [ + { + "id": 100, + "order": "10.00000000000000000000", + trigger_table_fields[0].name: "Apple", + trigger_table_fields[1].name: "Red", + }, + { + "id": 101, + "order": "10.00000000000000000000", + trigger_table_fields[0].name: "Banana", + trigger_table_fields[1].name: "Yellow", + }, + ], + "has_next_page": False, + }, + ) + + +@pytest.mark.django_db +def test_dispatch_node_service_error(data_fixture): + user = data_fixture.create_user() + trigger_node = data_fixture.create_local_baserow_rows_created_trigger_node( + user=user + ) + # create action node without any table configured + data_fixture.create_local_baserow_create_row_action_node( + workflow=trigger_node.workflow + ) + original_workflow = AutomationWorkflowHandler().get_original_workflow( + trigger_node.workflow + ) + workflow_history = data_fixture.create_automation_workflow_history( + workflow=original_workflow + ) + + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + assert result is None + + workflow_history.refresh_from_db() + error = "is misconfigured and cannot be dispatched" + assert error in workflow_history.message + assert workflow_history.status == HistoryStatusChoices.ERROR + + node_history = AutomationNodeHistory.objects.get(workflow_history=workflow_history) + assert error in node_history.message + assert node_history.status == HistoryStatusChoices.ERROR + + +@pytest.mark.django_db +@patch(f"{TRIGGER_NODE_TYPE_PATH}.dispatch") +@patch(f"{NODE_HANDLER_PATH}.logger") +def test_dispatch_node_unexpected_error(mock_logger, mock_dispatch, data_fixture): + mock_dispatch.side_effect = ValueError("Unexpected error!") + + data = create_workflow(data_fixture) + trigger_node = data["trigger_node"] + workflow_history = data["workflow_history"] + + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + assert result is None + workflow_history.refresh_from_db() + error = ( + f"Unexpected error while running workflow {trigger_node.workflow.id}. " + "Error: Unexpected error!" + ) + mock_logger.exception.assert_called_once_with(error) + assert error in workflow_history.message + assert workflow_history.status == HistoryStatusChoices.ERROR + + node_history = AutomationNodeHistory.objects.get(workflow_history=workflow_history) + assert error in node_history.message + assert node_history.status == HistoryStatusChoices.ERROR + + +@pytest.mark.django_db +def test_dispatch_node_dispatches_trigger(data_fixture): + data = create_workflow(data_fixture) + trigger_node = data["trigger_node"] + action_node = data["action_node"] + workflow_history = data["workflow_history"] + + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + + handle_workflow_dispatch_done(history_id=workflow_history.id) + workflow_history.refresh_from_db() + assert workflow_history.message == "" + assert workflow_history.status == HistoryStatusChoices.SUCCESS + + node_history = AutomationNodeHistory.objects.get(workflow_history=workflow_history) + assert node_history.message == "" + assert node_history.status == HistoryStatusChoices.SUCCESS + + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.iteration == 0 + assert node_result.result == workflow_history.event_payload + + assert_dispatches_next_node(result, (action_node, workflow_history, None)) + + +@pytest.mark.django_db +def test_dispatch_node_dispatches_action_create_row(data_fixture): + data = create_workflow(data_fixture) + trigger_node = data["trigger_node"] + action_node = data["action_node"] + workflow_history = data["workflow_history"] + action_table = data["action_table"] + action_table_field = data["action_table_field"] + + # First dispatch the trigger + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + + assert_dispatches_next_node(result, (action_node, workflow_history, None)) + + assert action_table.get_model().objects.all().count() == 0 + + # Next dispatch the action + result = AutomationNodeHandler().dispatch_node( + action_node.id, + history_id=workflow_history.id, + ) + assert result is None + + # Make sure the action dispatched correctly + value = getattr( + action_table.get_model().objects.all()[0], action_table_field.db_column + ) + assert value == "Apple" + + handle_workflow_dispatch_done(history_id=workflow_history.id) + workflow_history.refresh_from_db() + assert workflow_history.message == "" + assert workflow_history.status == HistoryStatusChoices.SUCCESS + + node_history = ( + AutomationNodeHistory.objects.filter(workflow_history=workflow_history) + .order_by("-id") + .first() + ) + assert node_history.message == "" + assert node_history.status == HistoryStatusChoices.SUCCESS + + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.iteration == 0 + assert node_result.result == { + action_table_field.name: "Apple", + "id": AnyInt(), + "order": AnyStr(), + } + + +@pytest.mark.django_db +def test_dispatch_node_dispatches_iterator_children(data_fixture): + data = data_fixture.iterator_graph_fixture() + trigger_node = data["trigger_node"] + trigger_table_fields = data["trigger_table_fields"] + iterator_node = data["iterator_node"] + iterator_child_1_node = data["iterator_child_1_node"] + iterator_child_1_table = data["iterator_child_1_table"] + iterator_child_1_table_fields = data["iterator_child_1_table_fields"] + iterator_child_2_node = data["iterator_child_2_node"] + iterator_child_2_table_fields = data["iterator_child_2_table_fields"] + after_iteration_node = data["after_iteration_node"] + + workflow_history = create_workflow_history( + data_fixture, + trigger_node.workflow, + trigger_table_fields, + ) + + # First dispatch the trigger + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + assert_dispatches_next_node(result, (iterator_node, workflow_history, None)) + + assert iterator_child_1_table.get_model().objects.all().count() == 0 + + # Next dispatch the iterator node + result = AutomationNodeHandler().dispatch_node( + iterator_node.id, + history_id=workflow_history.id, + ) + # Clear the local cache between dispatch_node() calls to simulate + # how Celery clears the local cache between tasks in production. + clear_local() + + # result is a chain of chords + assert isinstance(result, Signature) + + # Make sure the iterator children's node history and results are persisted. + # There are two rows in the payload, so we expect two histories. + assert AutomationNodeHistory.objects.filter(node=iterator_child_1_node).count() == 0 + + result = AutomationNodeHandler().dispatch_node( + iterator_child_1_node.id, + history_id=workflow_history.id, + current_iterations={iterator_node.id: 0}, + ) + clear_local() + + assert_dispatches_next_node( + result, (iterator_child_2_node, workflow_history, {iterator_node.id: 0}) + ) + + # Manually dispatch child 1 for iteration 1 + result = AutomationNodeHandler().dispatch_node( + iterator_child_1_node.id, + history_id=workflow_history.id, + current_iterations={iterator_node.id: 1}, + ) + clear_local() + + assert_dispatches_next_node( + result, (iterator_child_2_node, workflow_history, {iterator_node.id: 1}) + ) + + node_histories = AutomationNodeHistory.objects.filter( + node=iterator_child_1_node, status=HistoryStatusChoices.SUCCESS + ).order_by("id") + assert len(node_histories) == 2 + + # workflow history should still be "started", since the final node + # hasn't been dispatched yet. + workflow_history.refresh_from_db() + assert workflow_history.status == HistoryStatusChoices.STARTED + + # Dispatch the after iteration node + result = AutomationNodeHandler().dispatch_node( + after_iteration_node.id, + history_id=workflow_history.id, + ) + + # There are no next nodes + assert result is None + + # workflow history should be finally be updated as success + handle_workflow_dispatch_done(history_id=workflow_history.id) + workflow_history.refresh_from_db() + assert workflow_history.message == "" + assert workflow_history.status == HistoryStatusChoices.SUCCESS + + +@pytest.mark.django_db +@patch(f"{NODE_HANDLER_PATH}.automation_node_updated") +def test_dispatch_node_dispatches_trigger_simulation( + mock_automation_node_updated, + data_fixture, +): + data = create_workflow(data_fixture) + trigger_node = data["trigger_node"] + trigger_table_field_a = data["trigger_table_field_a"] + trigger_table_field_b = data["trigger_table_field_b"] + + workflow_history = data["workflow_history"] + workflow_history.simulate_until_node = trigger_node + workflow_history.save() + + assert trigger_node.service.specific.sample_data is None + + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + + # There are no next nodes + assert result is None + + # Ensure workflow history is deleted, since we don't want history + # entries for simulations. + handle_workflow_dispatch_done(simulate_until_node_id=trigger_node.id) + assert ( + AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() + is False + ) + + # Ensure the sample data is saved + trigger_node.service.specific.refresh_from_db() + assert trigger_node.service.specific.sample_data == { + "data": { + "results": [ + { + "id": AnyInt(), + "order": AnyStr(), + trigger_table_field_a.name: "Apple", + trigger_table_field_b.name: "Red", + }, + { + "id": AnyInt(), + "order": AnyStr(), + trigger_table_field_a.name: "Banana", + trigger_table_field_b.name: "Yellow", + }, + ], + "has_next_page": False, + }, + "status": 200, + "output_uid": "", + } + + mock_automation_node_updated.send.assert_called_once_with( + ANY, user=None, node=trigger_node + ) + + +@pytest.mark.django_db +@patch(f"{NODE_HANDLER_PATH}.automation_node_updated") +def test_dispatch_node_dispatches_action_simulation( + mock_automation_node_updated, + data_fixture, +): + data = create_workflow(data_fixture) + trigger_node = data["trigger_node"] + action_node = data["action_node"] + action_table_field = data["action_table_field"] + trigger_table_field_a = data["trigger_table_field_a"] + trigger_table_field_b = data["trigger_table_field_b"] + + workflow_history = data["workflow_history"] + workflow_history.simulate_until_node = action_node + workflow_history.save() + + assert action_node.service.specific.sample_data is None + + # Simulate the trigger first so that the dispatch context can populate + # previous_node_results from the database. + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + assert_dispatches_next_node(result, (action_node, workflow_history, None)) + + # Make sure the trigger node simulation saves a history entry + node_history = AutomationNodeHistory.objects.get(workflow_history=workflow_history) + assert node_history.message == "" + assert node_history.status == HistoryStatusChoices.SUCCESS + + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.iteration == 0 + assert node_result.result == { + "results": [ + { + "id": AnyInt(), + "order": AnyStr(), + trigger_table_field_a.name: "Apple", + trigger_table_field_b.name: "Red", + }, + { + "id": AnyInt(), + "order": AnyStr(), + trigger_table_field_a.name: "Banana", + trigger_table_field_b.name: "Yellow", + }, + ], + "has_next_page": False, + } + + # Now simulate the action + result = AutomationNodeHandler().dispatch_node( + action_node.id, + history_id=workflow_history.id, + ) + # There are no next nodes + assert result is None + + # Ensure the sample data is saved + action_node.service.specific.refresh_from_db() + assert action_node.service.specific.sample_data == { + "data": { + action_table_field.name: "Apple", + "id": AnyInt(), + "order": AnyStr(), + }, + "output_uid": "", + "status": 200, + } + + mock_automation_node_updated.send.assert_called_once_with( + ANY, user=None, node=action_node + ) + + # Ensure workflow history is deleted, since we don't want history + # entries for simulations. + handle_workflow_dispatch_done(simulate_until_node_id=action_node.id) + assert ( + AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() + is False + ) + + +@pytest.mark.django_db +@patch(f"{NODE_HANDLER_PATH}.automation_node_updated") +def test_dispatch_node_dispatches_iterator_simulation( + mock_automation_node_updated, + data_fixture, +): + data = data_fixture.iterator_graph_fixture() + trigger_node = data["trigger_node"] + trigger_table_fields = data["trigger_table_fields"] + iterator_node = data["iterator_node"] + iterator_child_1_table_fields = data["iterator_child_1_table_fields"] + iterator_child_1_node = data["iterator_child_1_node"] + iterator_child_2_node = data["iterator_child_2_node"] + after_iteration_node = data["after_iteration_node"] + + workflow_history = create_workflow_history( + data_fixture, + trigger_node.workflow, + trigger_table_fields, + ) + workflow_history.simulate_until_node = iterator_child_2_node + workflow_history.save() + + # Ensure that the iterator node and children don't yet have sample data + assert iterator_child_2_node.service.specific.sample_data is None + + # Simulate the trigger first so that the dispatch context can populate + # previous_node_results from the database. + for node in [trigger_node, iterator_node]: + result = AutomationNodeHandler().dispatch_node( + node.id, + history_id=workflow_history.id, + ) + # Clear the local cache between dispatch_node() calls to simulate + # how Celery clears the local cache between tasks in production. + clear_local() + + assert_dispatches_next_node( + result, + (iterator_child_1_node, workflow_history, {iterator_node.id: 0}), + (after_iteration_node, workflow_history, None), + ) + + for child_node in [iterator_child_1_node, iterator_child_2_node]: + result = AutomationNodeHandler().dispatch_node( + child_node.id, + history_id=workflow_history.id, + current_iterations={iterator_node.id: 0}, + ) + + # No more nodes to dispatch + assert result is None + + handle_workflow_dispatch_done(simulate_until_node_id=iterator_child_2_node.id) + + # Make sure the last iterator node simulation saves a history entry + iterator_child_2_node.service.specific.refresh_from_db() + assert iterator_child_2_node.service.specific.sample_data == { + "data": { + iterator_child_1_table_fields[0].name: AnyStr(), + "id": AnyInt(), + "order": AnyStr(), + }, + "output_uid": "", + "status": 200, + } + + mock_automation_node_updated.send.assert_called_once_with( + ANY, user=None, node=iterator_child_2_node + ) + + # Ensure workflow history is deleted, since we don't want history + # entries for simulations. + assert ( + AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() + is False + ) + + +@pytest.mark.django_db +def test_dispatch_node_dispatches_test_run( + data_fixture, +): + data = data_fixture.iterator_graph_fixture() + trigger_node = data["trigger_node"] + trigger_table_fields = data["trigger_table_fields"] + iterator_node = data["iterator_node"] + iterator_child_1_node = data["iterator_child_1_node"] + iterator_child_2_node = data["iterator_child_2_node"] + after_iteration_node = data["after_iteration_node"] + + workflow_history = create_workflow_history( + data_fixture, + trigger_node.workflow, + trigger_table_fields, + ) + + for node in [trigger_node, iterator_node]: + result = AutomationNodeHandler().dispatch_node( + node.id, + history_id=workflow_history.id, + ) + # Clear the local cache between dispatch_node() calls to simulate + # how Celery clears the local cache between tasks in production. + clear_local() + + assert_dispatches_next_node( + result, + (iterator_child_1_node, workflow_history, {iterator_node.id: 0}), + (iterator_child_1_node, workflow_history, {iterator_node.id: 1}), + (after_iteration_node, workflow_history, None), + ) + + # workflow history should still be "started", since the after iteration node + # hasn't been dispatched yet. + workflow_history.refresh_from_db() + assert workflow_history.status == HistoryStatusChoices.STARTED + + # Make sure all nodes have a history and node result + for node in [ + trigger_node, + iterator_node, + iterator_child_1_node, + iterator_child_2_node, + ]: + for index, node_history in enumerate( + AutomationNodeHistory.objects.filter( + workflow_history=workflow_history, + node=node, + ) + ): + assert node_history.message == "" + assert node_history.status == HistoryStatusChoices.SUCCESS + + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.iteration == index + assert len(node_result.result) > 0 + + # Ensure workflow history is exists for test runs + handle_workflow_dispatch_done(history_id=workflow_history.id) + assert ( + AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() + is True + ) + + # Dispatch the after iteration node + result = AutomationNodeHandler().dispatch_node( + after_iteration_node.id, + history_id=workflow_history.id, + ) + # there are no next nodes + assert result is None + + # workflow history should be finally be updated as success + workflow_history.refresh_from_db() + assert workflow_history.status == HistoryStatusChoices.SUCCESS + + +@pytest.mark.django_db +def test_dispatch_node_dispatches_action_update_row(data_fixture): + data = create_workflow( + data_fixture, + action_node_type="update_row", + action_node_service_value="foo", + ) + trigger_node = data["trigger_node"] + action_node = data["action_node"] + workflow_history = data["workflow_history"] + action_table = data["action_table"] + action_table_field = data["action_table_field"] + + # The value before the action updates the table + result = getattr( + action_table.get_model().objects.all()[0], action_table_field.db_column + ) + assert result == "foo" + + for node in [trigger_node, action_node]: + result = AutomationNodeHandler().dispatch_node( + node.id, + history_id=workflow_history.id, + ) + + # there are no next nodes + assert result is None + + # The value after the action updates the table + result = getattr( + action_table.get_model().objects.all()[0], action_table_field.db_column + ) + # Comes from the event_payload in workflow history + assert result == "Apple" + + handle_workflow_dispatch_done(history_id=workflow_history.id) + workflow_history.refresh_from_db() + assert workflow_history.message == "" + assert workflow_history.status == HistoryStatusChoices.SUCCESS + + node_history = AutomationNodeHistory.objects.get( + workflow_history=workflow_history, + node=action_node, + ) + assert node_history.message == "" + assert node_history.status == HistoryStatusChoices.SUCCESS + + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.iteration == 0 + assert node_result.result == { + action_table_field.name: "Apple", + "id": AnyInt(), + "order": AnyStr(), + } + + +@pytest.mark.django_db +def test_dispatch_node_dispatches_action_delete_row(data_fixture): + data = create_workflow( + data_fixture, + action_node_type="delete_row", + ) + trigger_node = data["trigger_node"] + action_node = data["action_node"] + workflow_history = data["workflow_history"] + action_table = data["action_table"] + + # The row before it is deleted + assert action_table.get_model().objects.all().count() == 1 + + for node in [trigger_node, action_node]: + result = AutomationNodeHandler().dispatch_node( + node.id, + history_id=workflow_history.id, + ) + + # there are no next nodes + assert result is None + + # The row after it is deleted + assert action_table.get_model().objects.all().count() == 0 + + handle_workflow_dispatch_done(history_id=workflow_history.id) + workflow_history.refresh_from_db() + assert workflow_history.message == "" + assert workflow_history.status == HistoryStatusChoices.SUCCESS + + node_history = AutomationNodeHistory.objects.get( + workflow_history=workflow_history, + node=action_node, + ) + assert node_history.message == "" + assert node_history.status == HistoryStatusChoices.SUCCESS + + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.iteration == 0 + assert node_result.result == {} + + +@pytest.mark.django_db +def test_dispatch_node_dispatches_action_router(data_fixture): + data = create_workflow( + data_fixture, + action_node_type="update_row", + action_node_service_value="foo", + ) + workflow = data["workflow"] + trigger_node = data["trigger_node"] + action_node = data["action_node"] + workflow_history = data["workflow_history"] + action_table = data["action_table"] + action_table_field = data["action_table_field"] + action_table_row = data["action_table_row"] + integration = data["integration"] + + router_node = data_fixture.create_core_router_action_node( + workflow=workflow, + reference_node=action_node, + position="south", + ) + data_fixture.create_core_router_service_edge( + service=router_node.service, label="Edge 1", condition="'false'" + ) + edge_2 = data_fixture.create_core_router_service_edge( + service=router_node.service, + label="Edge 2", + condition="'true'", + skip_output_node=True, + ) + edge_2_node = data_fixture.create_local_baserow_update_row_action_node( + workflow=workflow, + reference_node=router_node, + position="south", + output=edge_2.uid, + service_kwargs={ + "table": action_table, + "integration": integration, + "row_id": action_table_row.id, + }, + ) + edge_2_node.service.field_mappings.create( + field=action_table_field, value="'Cherry'" + ) + + trigger_node.workflow.refresh_from_db() + + # The value before the router edge 2 updates the table + result = getattr( + action_table.get_model().objects.all()[0], action_table_field.db_column + ) + assert result == "foo" + + for node in [trigger_node, action_node, router_node, edge_2_node]: + result = AutomationNodeHandler().dispatch_node( + node.id, + history_id=workflow_history.id, + ) + + # there are no next nodes + assert result is None + + # The value after the router edge 2 updates the table + result = getattr( + action_table.get_model().objects.all()[0], action_table_field.db_column + ) + assert result == "Cherry" + + handle_workflow_dispatch_done(history_id=workflow_history.id) + workflow_history.refresh_from_db() + assert workflow_history.message == "" + assert workflow_history.status == HistoryStatusChoices.SUCCESS + + node_history = AutomationNodeHistory.objects.get( + workflow_history=workflow_history, + node=edge_2_node, + ) + assert node_history.message == "" + assert node_history.status == HistoryStatusChoices.SUCCESS + + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.iteration == 0 + assert node_result.result == { + action_table_field.name: "Cherry", + "id": AnyInt(), + "order": AnyStr(), + } + + +@pytest.mark.django_db(transaction=True) +def test_dispatch_node_with_advanced_formulas(data_fixture): + user = data_fixture.create_user() + workspace = data_fixture.create_workspace(user=user) + integration = data_fixture.create_local_baserow_integration(user=user) + database = data_fixture.create_database_application(workspace=workspace) + + trigger_table, trigger_table_fields, _ = data_fixture.build_table( + user=user, + columns=[ + ("Food", "text"), + ("Spiciness", "number"), + ], + rows=[ + ["Paneer Tikka", 5], + ["Gobi Manchurian", 8], + ], + ) + + action_table, action_table_fields, _ = data_fixture.build_table( + database=database, + user=user, + columns=[("Name", "text")], + rows=[], + ) + workflow = data_fixture.create_automation_workflow(user, state="live") + trigger = workflow.get_trigger() + trigger_service = trigger.service.specific + trigger_service.table = trigger_table + trigger_service.integration = integration + trigger_service.save() + action_node = data_fixture.create_local_baserow_create_row_action_node( + workflow=workflow, + service=data_fixture.create_local_baserow_upsert_row_service( + table=action_table, + integration=integration, + ), + ) + action_node.service.field_mappings.create( + field=action_table_fields[0], + value=( + "concat('The comparison is ', " + f"get('previous_node.{trigger.id}.0.{trigger_table_fields[1].db_column}') " + "> 7)" + ), + ) + + action_table_model = action_table.get_model() + assert action_table_model.objects.count() == 0 + + original_workflow = AutomationWorkflowHandler().get_original_workflow(workflow) + workflow_history = data_fixture.create_automation_workflow_history( + workflow=original_workflow, + event_payload={ + "results": [ + { + "id": 100, + "order": "10.00000000000000000000", + trigger_table_fields[0].name: "Paneer Tikka", + trigger_table_fields[1].name: 5, + }, + { + "id": 101, + "order": "10.00000000000000000000", + trigger_table_fields[0].name: "Gobi Manchurian", + trigger_table_fields[1].name: 8, + }, + ], + "has_next_page": False, + }, + ) + + result = AutomationNodeHandler().dispatch_node( + trigger.id, + history_id=workflow_history.id, + ) + assert_dispatches_next_node(result, (action_node, workflow_history, None)) + + result = AutomationNodeHandler().dispatch_node( + action_node.id, + history_id=workflow_history.id, + ) + assert result is None + + handle_workflow_dispatch_done(history_id=workflow_history.id) + row = action_table_model.objects.first() + assert getattr(row, action_table_fields[0].db_column) == "The comparison is false" + + +@pytest.mark.django_db +@patch(f"{NODE_HANDLER_PATH}.automation_node_updated") +def test_dispatch_node_dispatches_router_edge_simulation( + mock_automation_node_updated, + data_fixture, +): + user = data_fixture.create_user() + workspace = data_fixture.create_workspace(user=user) + integration = data_fixture.create_local_baserow_integration(user=user) + database = data_fixture.create_database_application(workspace=workspace) + + # Create trigger + workflow = data_fixture.create_automation_workflow( + user=user, + trigger_type="local_baserow_rows_created", + ) + trigger_node = workflow.get_trigger() + trigger_table = data_fixture.create_database_table(database=database) + trigger_table_field = data_fixture.create_text_field(table=trigger_table) + trigger_service = trigger_node.service.specific + trigger_service.table = trigger_table + trigger_service.integration = integration + trigger_service.save() + + # Create first router with two edges + router_a = data_fixture.create_core_router_action_node( + workflow=workflow, + reference_node=trigger_node, + position="south", + label="Router A", + ) + router_a_edge_1 = data_fixture.create_core_router_service_edge( + service=router_a.service, + label="Router A Edge 1", + condition="'true'", + skip_output_node=True, + ) + data_fixture.create_core_router_service_edge( + service=router_a.service, + label="Router A Edge 2", + condition="'false'", + ) + + # Create second router on edge_1 of router_a + router_b = data_fixture.create_core_router_action_node( + workflow=workflow, + reference_node=router_a, + position="south", + output=router_a_edge_1.uid, + label="Router B", + ) + data_fixture.create_core_router_service_edge( + service=router_b.service, + label="Router B Edge 1", + condition="'false'", + skip_output_node=True, + ) + router_b_edge_2 = data_fixture.create_core_router_service_edge( + service=router_b.service, + label="Router B Edge 2", + condition="'true'", + skip_output_node=True, + ) + + # Create action node on edge_2 of router_b + action_table = data_fixture.create_database_table(database=database) + action_table_field = data_fixture.create_text_field(table=action_table) + action_node = data_fixture.create_local_baserow_create_row_action_node( + workflow=workflow, + reference_node=router_b, + position="south", + output=router_b_edge_2.uid, + label="Action node", + service_kwargs={ + "table": action_table, + "integration": integration, + }, + ) + action_node.service.field_mappings.create( + field=action_table_field, + value="'Pumpkin Pie'", + ) + + # Ensure there is no sample data yet + for node in [trigger_node, router_a, router_b, action_node]: + assert node.service.specific.sample_data is None + + original_workflow = AutomationWorkflowHandler().get_original_workflow(workflow) + workflow_history = data_fixture.create_automation_workflow_history( + workflow=original_workflow, + event_payload={ + "results": [ + { + "id": 1, + "order": "1.00000000000000000000", + trigger_table_field.name: "Cherry Cake", + } + ], + "has_next_page": False, + }, + ) + workflow_history.simulate_until_node = action_node + workflow_history.save() + + for node in [trigger_node, router_a, router_b, action_node]: + result = AutomationNodeHandler().dispatch_node( + node.id, + history_id=workflow_history.id, + ) + + # there are no next nodes + assert result is None + + # Verify sample_data is saved for the action node + action_node.service.specific.refresh_from_db() + assert action_node.service.specific.sample_data == { + "data": { + action_table_field.name: "Pumpkin Pie", + "id": AnyInt(), + "order": AnyStr(), + }, + "output_uid": AnyStr(), + "status": 200, + } + + mock_automation_node_updated.send.assert_called_once_with( + ANY, user=None, node=action_node + ) + + # Verify workflow history is deleted for simulations + handle_workflow_dispatch_done(simulate_until_node_id=action_node.id) + assert ( + AutomationWorkflowHistory.objects.filter(id=workflow_history.id).exists() + is False + ) + + +@pytest.mark.django_db +def test_dispatch_node_sets_workflow_history_error(data_fixture): + """ + Ensure that when a node raises an error, the workflow history status + is correctly set to ERROR. + """ + + data = create_workflow(data_fixture) + trigger_node = data["trigger_node"] + action_node = data["action_node"] + workflow_history = data["workflow_history"] + + # Trigger dispatches successfully + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + assert isinstance(result, Signature) + + # Cause the action node to fail by unsetting its table + action_node.service.specific.table = None + action_node.service.specific.save() + + result = AutomationNodeHandler().dispatch_node( + action_node.id, + history_id=workflow_history.id, + ) + assert result is None + + # chord callback shouldn't overwrite ERROR with SUCCESS + handle_workflow_dispatch_done(history_id=workflow_history.id) + + workflow_history.refresh_from_db() + assert workflow_history.status == HistoryStatusChoices.ERROR + assert "No table selected" in workflow_history.message + + +@pytest.mark.django_db +def test_dispatch_node_iterator_with_no_rows(data_fixture): + """ + This test ensures that when the iterator node receives no nodes, + an empty chain is not created (which would cause a crash). + """ + + # We want an Iterator node with no next-nodes. This is to ensure that when + # an iterator receives no rows, it doesn't create an empty chain. + data = data_fixture.iterator_graph_fixture(create_after_iteration_node=False) + trigger_node = data["trigger_node"] + trigger_table_fields = data["trigger_table_fields"] + iterator_node = data["iterator_node"] + iterator_child_1_node = data["iterator_child_1_node"] + + # Create workflow history with 0 rows in the event payload. + original_workflow = AutomationWorkflowHandler().get_original_workflow( + trigger_node.workflow + ) + workflow_history = data_fixture.create_automation_workflow_history( + workflow=original_workflow, + event_payload={ + "results": [], + "has_next_page": False, + }, + ) + + # Dispatch the trigger. + result = AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + assert_dispatches_next_node(result, (iterator_node, workflow_history, None)) + + # Dispatch the iterator. + result = AutomationNodeHandler().dispatch_node( + iterator_node.id, + history_id=workflow_history.id, + ) + # Ensure we never return an empty chain, which would cause + # self.replace() to crash with an error. + assert result is None diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_handler.py b/backend/tests/baserow/contrib/automation/nodes/test_node_handler.py index acf8eeb13f..6cdb312151 100644 --- a/backend/tests/baserow/contrib/automation/nodes/test_node_handler.py +++ b/backend/tests/baserow/contrib/automation/nodes/test_node_handler.py @@ -1,10 +1,5 @@ -from unittest.mock import patch - import pytest -from baserow.contrib.automation.automation_dispatch_context import ( - AutomationDispatchContext, -) from baserow.contrib.automation.nodes.exceptions import AutomationNodeDoesNotExist from baserow.contrib.automation.nodes.handler import AutomationNodeHandler from baserow.contrib.automation.nodes.models import LocalBaserowRowsCreatedTriggerNode @@ -13,7 +8,7 @@ from baserow.core.cache import local_cache from baserow.core.trash.handler import TrashHandler from baserow.core.utils import MirrorDict -from baserow.test_utils.helpers import AnyDict, AnyInt, AnyStr +from baserow.test_utils.helpers import AnyDict @pytest.mark.django_db @@ -209,344 +204,3 @@ def test_import_node_only(data_fixture): "automation_workflow_nodes": {node.id: new_node.id}, "services": {node.service_id: new_node.service_id}, } - - -@pytest.mark.django_db -def test_simulate_dispatch_node_trigger(data_fixture): - user, _ = data_fixture.create_user_and_token() - database = data_fixture.create_database_application(user=user) - table, fields, _ = data_fixture.build_table( - user=user, columns=[("Name", "text")], rows=[], database=database - ) - workflow = data_fixture.create_automation_workflow( - user=user, trigger_service_kwargs={"table": table} - ) - - trigger_node = workflow.get_trigger() - - assert workflow.simulate_until_node is None - - action_node = data_fixture.create_automation_node( - workflow=workflow, - type="local_baserow_create_row", - ) - - # Set initial fake data for the action_node, since we want to test - # that it is not affected. - action_node.service.sample_data = {"foo": "bar"} - action_node.service.save() - - dispatch_context = AutomationDispatchContext( - action_node.workflow, {"trigger": "data"}, simulate_until_node=trigger_node - ) - - AutomationNodeHandler().dispatch_node(trigger_node, dispatch_context) - - trigger_node.refresh_from_db() - assert trigger_node.service.sample_data == { - "data": {"trigger": "data"}, - "output_uid": "", - "status": 200, - } - - action_node.refresh_from_db() - assert action_node.service.sample_data == {"foo": "bar"} - - -def create_action_node(data_fixture): - user, _ = data_fixture.create_user_and_token() - workspace = data_fixture.create_workspace(user=user) - automation = data_fixture.create_automation_application( - user=user, workspace=workspace - ) - workflow = data_fixture.create_automation_workflow(user=user, automation=automation) - integration = data_fixture.create_local_baserow_integration( - user=user, application=automation - ) - - database = data_fixture.create_database_application(user=user, workspace=workspace) - table, fields, _ = data_fixture.build_table( - user=user, - database=database, - columns=[("Name", "text")], - rows=[], - ) - action_service = data_fixture.create_local_baserow_upsert_row_service( - table=table, - integration=integration, - ) - action_service.field_mappings.create( - field=fields[0], - value="'A new row'", - ) - action_node = data_fixture.create_automation_node( - user=user, - workflow=workflow, - type="local_baserow_create_row", - service=action_service, - ) - - return { - "action_node": action_node, - "table": table, - "fields": fields, - "user": user, - } - - -@pytest.mark.django_db -def test_simulate_dispatch_node_action(data_fixture): - data = create_action_node(data_fixture) - action_node = data["action_node"] - table = data["table"] - fields = data["fields"] - - assert action_node.service.sample_data is None - - dispatch_context = AutomationDispatchContext( - action_node.workflow, None, simulate_until_node=action_node - ) - - AutomationNodeHandler().dispatch_node(action_node, dispatch_context) - - action_node.refresh_from_db() - row = table.get_model().objects.first() - - assert action_node.service.sample_data == { - "data": { - fields[0].name: "A new row", - "id": row.id, - "order": str(row.order), - }, - "output_uid": "", - "status": 200, - } - - -@pytest.mark.django_db -@patch("baserow.core.services.registries.ServiceType.get_sample_data") -def test_simulate_dispatch_node_action_with_update_sample_data( - mock_get_sample_data, data_fixture -): - data = create_action_node(data_fixture) - action_node = data["action_node"] - fields = data["fields"] - - assert action_node.service.sample_data is None - - mock_get_sample_data.return_value = None - - dispatch_context = AutomationDispatchContext( - action_node.workflow, None, simulate_until_node=action_node - ) - - AutomationNodeHandler().dispatch_node(action_node, dispatch_context) - - action_node.refresh_from_db() - - assert action_node.service.sample_data == { - "data": { - fields[0].name: "A new row", - "id": AnyInt(), - "order": AnyStr(), - }, - "output_uid": "", - "status": 200, - } - - -@pytest.mark.django_db -def test_simulate_dispatch_node_action_with_simulate_until_node(data_fixture): - data = create_action_node(data_fixture) - action_node_1 = data["action_node"] - table = data["table"] - fields = data["fields"] - - action_node_2 = data_fixture.create_automation_node( - workflow=action_node_1.workflow, - type="local_baserow_create_row", - ) - - action_node_3 = data_fixture.create_automation_node( - workflow=action_node_1.workflow, - type="local_baserow_create_row", - ) - - nodes = [action_node_1, action_node_2, action_node_3] - for node in nodes: - assert node.service.sample_data is None - - dispatch_context = AutomationDispatchContext( - action_node_1.workflow, None, simulate_until_node=action_node_1 - ) - - AutomationNodeHandler().dispatch_node(action_node_1, dispatch_context) - - # Only the first action nodes dispatch should be simulated - action_node_1.refresh_from_db() - row = table.get_model().objects.first() - assert action_node_1.service.sample_data == { - "data": { - fields[0].name: "A new row", - "id": row.id, - "order": str(row.order), - }, - "output_uid": "", - "status": 200, - } - - # Due to the simulate_until_node param in the dispatch context, the - # other nodes should not be dispatched. - for node in [action_node_2, action_node_3]: - node.refresh_from_db() - assert node.service.sample_data is None - - -def create_action_node_service(data_fixture, user, automation, value): - integration = data_fixture.create_local_baserow_integration( - user=user, application=automation - ) - database = data_fixture.create_database_application( - user=user, workspace=automation.workspace - ) - table, fields, _ = data_fixture.build_table( - user=user, - database=database, - columns=[("Name", "text")], - rows=[], - ) - service = data_fixture.create_local_baserow_upsert_row_service( - table=table, - integration=integration, - ) - service.field_mappings.create( - field=fields[0], - value=f"'{value}'", - ) - - return service - - -@pytest.mark.django_db -def test_simulate_dispatch_node_dispatches_correct_edge_node(data_fixture): - """ - Ensure that when simulating a dispatch for a node that is an edge, - it is correctly dispatched. - """ - - user, _ = data_fixture.create_user_and_token() - workflow = data_fixture.create_automation_workflow(user=user, create_trigger=False) - trigger_node = data_fixture.create_local_baserow_rows_created_trigger_node( - workflow=workflow - ) - - router_a = data_fixture.create_core_router_action_node( - workflow=workflow, label="Router A" - ) - router_a_edge_1 = data_fixture.create_core_router_service_edge( - service=router_a.service, - label="Router A, Edge 1", - condition="'true'", - skip_output_node=True, - ) - router_a_edge_2 = data_fixture.create_core_router_service_edge( - service=router_a.service, - label="Router A, Edge 2", - condition="'false'", - skip_output_node=True, - ) - - router_b = data_fixture.create_core_router_action_node( - workflow=workflow, - reference_node=router_a, - position="south", - output=router_a_edge_1.uid, - label="Router B", - ) - router_b_edge_1 = data_fixture.create_core_router_service_edge( - service=router_b.service, - label="Router B, Edge 1", - condition="'false'", - skip_output_node=True, - ) - router_b_edge_2 = data_fixture.create_core_router_service_edge( - service=router_b.service, - label="Router B, Edge 2", - condition="'true'", - skip_output_node=True, - ) - - node_b_service = create_action_node_service( - data_fixture, user, workflow.automation, "apple" - ) - node_b = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - service=node_b_service, - reference_node=router_a, - label="Create row A", - ) - - node_c_1_service = create_action_node_service( - data_fixture, user, workflow.automation, "banana" - ) - node_c_1 = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - service=node_c_1_service, - reference_node=router_b, - label="Create row B", - ) - node_c_2_service = create_action_node_service( - data_fixture, user, workflow.automation, "cherry" - ) - node_c_2 = data_fixture.create_local_baserow_create_row_action_node( - workflow=workflow, - service=node_c_2_service, - reference_node=router_b, - position="south", - output=str(router_b_edge_2.uid), - label="Create row B, on edge", - ) - - nodes = [trigger_node, router_a, router_b, node_b, node_c_1, node_c_2] - for node in nodes: - assert node.service.sample_data is None - - dispatch_context = AutomationDispatchContext( - workflow, None, simulate_until_node=node_c_2 - ) - - workflow.assert_reference( - { - "0": "local_baserow_rows_created", - "local_baserow_rows_created": {"next": {"": ["Router A"]}}, - "Router A": { - "next": {"": ["Create row A"], "Router A, Edge 1": ["Router B"]} - }, - "Create row A": {}, - "Create row B": {}, - "Create row B, on edge": {}, - "Router B": { - "next": { - "": ["Create row B"], - "Router B, Edge 2": ["Create row B, on edge"], - } - }, - } - ) - - AutomationNodeHandler().dispatch_node(node_c_2, dispatch_context) - - # node_c_2 is intentionally excluded. here - nodes = [trigger_node, router_a, router_b, node_b, node_c_1] - for node in nodes: - node.service.refresh_from_db() - assert node.service.sample_data is None - - node_c_2.refresh_from_db() - node_c_2.service.refresh_from_db() - field = node_c_2.service.specific.table.field_set.all()[0] - assert node_c_2.service.sample_data == { - "data": {field.name: "cherry", "id": AnyInt(), "order": AnyStr()}, - "output_uid": AnyStr(), - "status": 200, - } diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_types.py b/backend/tests/baserow/contrib/automation/nodes/test_node_types.py index ed15effdba..63cd759d1c 100644 --- a/backend/tests/baserow/contrib/automation/nodes/test_node_types.py +++ b/backend/tests/baserow/contrib/automation/nodes/test_node_types.py @@ -118,7 +118,7 @@ def test_automation_node_type_create_row_dispatch(mock_dispatch, data_fixture): user=user, type="local_baserow_create_row" ) - dispatch_context = AutomationDispatchContext(node.workflow, None) + dispatch_context = AutomationDispatchContext(node.workflow, 100) result = node.get_type().dispatch(node, dispatch_context) assert result == mock_dispatch_result @@ -182,7 +182,7 @@ def test_automation_node_type_update_row_dispatch(mock_dispatch, data_fixture): user=user, type="local_baserow_update_row" ) - dispatch_context = AutomationDispatchContext(node.workflow, None) + dispatch_context = AutomationDispatchContext(node.workflow, 100) result = node.get_type().dispatch(node, dispatch_context) assert result == mock_dispatch_result @@ -238,7 +238,7 @@ def test_automation_node_type_delete_row_dispatch(mock_dispatch, data_fixture): user=user, type="local_baserow_delete_row" ) - dispatch_context = AutomationDispatchContext(node.workflow, None) + dispatch_context = AutomationDispatchContext(node.workflow, 100) result = node.get_type().dispatch(node, dispatch_context) assert result == mock_dispatch_result @@ -443,7 +443,7 @@ def test_trigger_node_dispatch_returns_event_payload_if_not_simulated(data_fixtu trigger = workflow.get_trigger().specific - dispatch_context = AutomationDispatchContext(workflow, "foo") + dispatch_context = AutomationDispatchContext(workflow, 100, event_payload="foo") result = trigger.get_type().dispatch(trigger, dispatch_context) @@ -464,7 +464,9 @@ def test_trigger_node_dispatch_returns_sample_data_if_simulated(data_fixture): trigger = workflow.get_trigger() - dispatch_context = AutomationDispatchContext(workflow, simulate_until_node=trigger) + dispatch_context = AutomationDispatchContext( + workflow, 100, simulate_until_node=trigger + ) # If we don't reset this value, the trigger is considered as updatable and will # be dispatched. dispatch_context.update_sample_data_for = [] diff --git a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py index 1cc1b944df..72a0c1648a 100644 --- a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py +++ b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py @@ -8,6 +8,7 @@ from freezegun import freeze_time from baserow.contrib.automation.history.constants import HistoryStatusChoices +from baserow.contrib.automation.history.models import AutomationWorkflowHistory from baserow.contrib.automation.models import AutomationWorkflow from baserow.contrib.automation.nodes.node_types import ( CorePeriodicTriggerNodeType, @@ -18,6 +19,7 @@ WorkflowState, ) from baserow.contrib.automation.workflows.exceptions import ( + AutomationWorkflowBeforeRunError, AutomationWorkflowDoesNotExist, AutomationWorkflowNameNotUnique, AutomationWorkflowNotInAutomation, @@ -870,3 +872,150 @@ def test_toggle_simulate_mode_on_immediate( mock_automation_workflow_updated.send.assert_called_once() mock_async_start_workflow.assert_called_once() + + +@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=7) +@pytest.mark.django_db +def test_clear_old_history_deletes_history_older_than_max_days(data_fixture): + workflow = data_fixture.create_automation_workflow() + + with freeze_time("2025-02-01 12:00:00"): + old_history = data_fixture.create_automation_workflow_history(workflow=workflow) + + with freeze_time("2025-02-02 12:00:00"): + recent_history = data_fixture.create_automation_workflow_history( + workflow=workflow + ) + + # This is 8 days after old_history was created, so it should be deleted. + with freeze_time("2025-02-09 12:00:00"): + AutomationWorkflowHandler()._clear_old_history(workflow) + + assert workflow.workflow_histories.filter(id=old_history.id).exists() is False + assert workflow.workflow_histories.filter(id=recent_history.id).exists() is True + + +@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=3) +@pytest.mark.django_db +def test_clear_old_history_keeps_only_max_entries(data_fixture): + workflow = data_fixture.create_automation_workflow() + + histories = [] + day = 10 + for i in range(5): + day += i + with freeze_time(f"2025-02-{day} 12:00:00"): + histories.append( + data_fixture.create_automation_workflow_history(workflow=workflow) + ) + + with freeze_time(f"2025-02-16 12:00:00"): + AutomationWorkflowHandler()._clear_old_history(workflow) + + assert workflow.workflow_histories.all().count() == 3 + + # The two oldest should be deleted + for history in histories[:2]: + assert workflow.workflow_histories.filter(id=history.id).exists() is False + + # The three newest should be kept + for history in histories[2:]: + assert workflow.workflow_histories.filter(id=history.id).exists() is True + + +@override_settings( + AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=3, + AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=1, +) +@pytest.mark.django_db +def test_clear_old_history_keeps_entries(data_fixture): + workflow = data_fixture.create_automation_workflow() + + with freeze_time("2025-02-01 12:00:00"): + history = data_fixture.create_automation_workflow_history(workflow=workflow) + + with freeze_time("2025-02-02 12:00:00"): + AutomationWorkflowHandler()._clear_old_history(workflow) + + # history is within limits, so it should be kept + assert workflow.workflow_histories.filter(id=history.id).exists() is True + + +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") +@patch("baserow.contrib.automation.nodes.tasks.dispatch_node_celery_task") +def test_start_workflow_too_many_errors( + mock_dispatch_task, mock_before_run, data_fixture +): + mock_before_run.side_effect = AutomationWorkflowTooManyErrors( + "mock too many errors" + ) + + original_workflow = data_fixture.create_automation_workflow() + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + + AutomationWorkflowHandler().start_workflow(published_workflow, None, None) + + # Nodes shouldn't be dispatched because before_run() should return early. + mock_dispatch_task.delay.assert_not_called() + + histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + + assert len(histories) == 1 + + history = histories[0] + assert history.workflow == original_workflow + assert history.status == HistoryStatusChoices.DISABLED + + error_msg = "mock too many errors" + assert history.message == error_msg + + original_workflow.refresh_from_db() + published_workflow.refresh_from_db() + + assert original_workflow.state == WorkflowState.DISABLED + assert published_workflow.state == WorkflowState.DISABLED + + +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run") +@patch("baserow.contrib.automation.nodes.tasks.dispatch_node_celery_task") +def test_start_workflow_before_run_error( + mock_dispatch_task, mock_before_run, data_fixture +): + # We already test the specific AutomationWorkflowTooManyErrors error above, + # but we should also test that before_run() has error handling. + mock_before_run.side_effect = AutomationWorkflowBeforeRunError("unexpected error") + + original_workflow = data_fixture.create_automation_workflow() + published_workflow = data_fixture.create_automation_workflow( + state=WorkflowState.LIVE + ) + published_workflow.automation.published_from = original_workflow + published_workflow.automation.save() + + AutomationWorkflowHandler().start_workflow(published_workflow, None, None) + + # Nodes shouldn't be dispatched because before_run() should return early. + mock_dispatch_task.delay.assert_not_called() + + histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + + assert len(histories) == 1 + + history = histories[0] + assert history.workflow == original_workflow + assert history.status == HistoryStatusChoices.ERROR + + error_msg = "unexpected error" + assert history.message == error_msg + + original_workflow.refresh_from_db() + published_workflow.refresh_from_db() + + assert original_workflow.state == WorkflowState.DRAFT + assert published_workflow.state == WorkflowState.LIVE diff --git a/backend/tests/baserow/contrib/automation/workflows/test_workflow_tasks.py b/backend/tests/baserow/contrib/automation/workflows/test_workflow_tasks.py deleted file mode 100644 index 359c9bacb8..0000000000 --- a/backend/tests/baserow/contrib/automation/workflows/test_workflow_tasks.py +++ /dev/null @@ -1,148 +0,0 @@ -from unittest.mock import patch - -import pytest - -from baserow.contrib.automation.history.models import AutomationWorkflowHistory -from baserow.contrib.automation.workflows.constants import WorkflowState -from baserow.contrib.automation.workflows.exceptions import ( - AutomationWorkflowTooManyErrors, -) -from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler -from baserow.contrib.automation.workflows.tasks import start_workflow_celery_task -from baserow.core.services.exceptions import DispatchException - - -@pytest.mark.django_db -@patch("baserow.contrib.automation.nodes.handler.AutomationNodeHandler.dispatch_node") -def test_run_workflow_success_creates_workflow_history( - mock_dispatch_node, data_fixture -): - user = data_fixture.create_user() - original_workflow = data_fixture.create_automation_workflow(user) - published_workflow = data_fixture.create_automation_workflow( - user, state=WorkflowState.LIVE - ) - published_workflow.automation.published_from = original_workflow - published_workflow.automation.save() - - assert ( - AutomationWorkflowHistory.objects.filter(workflow=original_workflow).count() - == 0 - ) - - AutomationWorkflowHandler().start_workflow(published_workflow, {"event": "payload"}) - - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) - assert len(histories) == 1 - history = histories[0] - assert history.workflow == original_workflow - assert history.status == "success" - assert history.message == "" - - -@pytest.mark.django_db -@patch("baserow.contrib.automation.nodes.handler.AutomationNodeHandler.dispatch_node") -def test_run_workflow_dispatch_error_creates_workflow_history( - mock_dispatch_node, data_fixture -): - original_workflow = data_fixture.create_automation_workflow() - published_workflow = data_fixture.create_automation_workflow( - state=WorkflowState.LIVE - ) - published_workflow.automation.published_from = original_workflow - published_workflow.automation.save() - - mock_dispatch_node.side_effect = DispatchException("mock dispatch error") - - assert ( - AutomationWorkflowHistory.objects.filter(workflow=original_workflow).count() - == 0 - ) - - result = start_workflow_celery_task(published_workflow.id, False, None) - - assert result is None - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) - assert len(histories) == 1 - history = histories[0] - assert history.workflow == original_workflow - assert history.status == "error" - assert history.message == "mock dispatch error" - - -@pytest.mark.django_db -@patch("baserow.contrib.automation.nodes.handler.AutomationNodeHandler.dispatch_node") -@patch("baserow.contrib.automation.workflows.handler.logger") -def test_run_workflow_unexpected_error_creates_workflow_history( - mock_logger, mock_dispatch_node, data_fixture -): - original_workflow = data_fixture.create_automation_workflow() - published_workflow = data_fixture.create_automation_workflow( - state=WorkflowState.LIVE - ) - published_workflow.automation.published_from = original_workflow - published_workflow.automation.save() - - mock_dispatch_node.side_effect = ValueError("mock unexpected error") - - assert ( - AutomationWorkflowHistory.objects.filter(workflow=original_workflow).count() - == 0 - ) - - result = start_workflow_celery_task(published_workflow.id, False, None) - - assert result is None - - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) - assert len(histories) == 1 - history = histories[0] - assert history.workflow == original_workflow - assert history.status == "error" - error_msg = ( - f"Unexpected error while running workflow {original_workflow.id}. " - "Error: mock unexpected error" - ) - assert history.message == error_msg - mock_logger.exception.assert_called_once_with(error_msg) - - -@pytest.mark.django_db -@patch( - "baserow.contrib.automation.workflows.handler.AutomationWorkflowHandler._check_too_many_errors" -) -@patch("baserow.contrib.automation.nodes.handler.AutomationNodeHandler.dispatch_node") -def test_run_workflow_disables_workflow_if_too_many_consecutive_errors( - mock_dispatch_node, mock_has_too_many_errors, data_fixture -): - mock_has_too_many_errors.side_effect = AutomationWorkflowTooManyErrors( - "mock too many errors" - ) - - original_workflow = data_fixture.create_automation_workflow() - published_workflow = data_fixture.create_automation_workflow( - state=WorkflowState.LIVE - ) - published_workflow.automation.published_from = original_workflow - published_workflow.automation.save() - - start_workflow_celery_task(published_workflow.id, False, None) - - mock_dispatch_node.assert_not_called() - - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) - - assert len(histories) == 1 - - history = histories[0] - assert history.workflow == original_workflow - assert history.status == "disabled" - - error_msg = "mock too many errors" - assert history.message == error_msg - - original_workflow.refresh_from_db() - published_workflow.refresh_from_db() - - assert original_workflow.state == WorkflowState.DISABLED - assert published_workflow.state == WorkflowState.DISABLED diff --git a/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py b/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py index 8d2e4fc5ec..607053408e 100644 --- a/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py +++ b/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py @@ -1,12 +1,15 @@ import json from unittest.mock import Mock, patch +from django.utils import timezone + import pytest from baserow.contrib.automation.automation_dispatch_context import ( AutomationDispatchContext, ) from baserow.contrib.automation.formula_importer import import_formula +from baserow.contrib.automation.history.handler import AutomationHistoryHandler from baserow.contrib.integrations.slack.service_types import ( SlackWriteMessageServiceType, ) @@ -16,7 +19,6 @@ ServiceImproperlyConfiguredDispatchException, ) from baserow.core.services.handler import ServiceHandler -from baserow.core.services.types import DispatchResult from baserow.test_utils.helpers import AnyInt from baserow.test_utils.pytest_conftest import FakeDispatchContext @@ -139,7 +141,22 @@ def test_dispatch_slack_write_message_with_formulas(data_fixture): user = data_fixture.create_user() application = data_fixture.create_automation_application(user=user) workflow = data_fixture.create_automation_workflow(automation=application) + workflow_history = AutomationHistoryHandler().create_workflow_history( + workflow, + timezone.now(), + False, + ) + trigger = workflow.get_trigger() + trigger_node_history = AutomationHistoryHandler().create_node_history( + workflow_history=workflow_history, + node=trigger, + started_on=timezone.now(), + ) + AutomationHistoryHandler().create_node_result( + node_history=trigger_node_history, + result={"results": [{"name": "John"}]}, + ) integration = IntegrationService().create_integration( user, @@ -156,9 +173,9 @@ def test_dispatch_slack_write_message_with_formulas(data_fixture): ) service_type = service.get_type() - dispatch_context = AutomationDispatchContext(workflow) - dispatch_context.after_dispatch( - trigger, DispatchResult(data={"results": [{"name": "John"}]}) + dispatch_context = AutomationDispatchContext( + workflow, + workflow_history.id, ) mock_response = Mock() diff --git a/changelog/entries/unreleased/refactor/3838_add_support_for_dispatching_automation_nodes_asynchronously.json b/changelog/entries/unreleased/refactor/3838_add_support_for_dispatching_automation_nodes_asynchronously.json new file mode 100644 index 0000000000..326e60c085 --- /dev/null +++ b/changelog/entries/unreleased/refactor/3838_add_support_for_dispatching_automation_nodes_asynchronously.json @@ -0,0 +1,9 @@ +{ + "type": "refactor", + "message": "Add support for dispatching automation nodes asynchronously.", + "issue_origin": "github", + "issue_number": 3838, + "domain": "automation", + "bullet_points": [], + "created_at": "2026-02-12" +} \ No newline at end of file diff --git a/docker-compose.no-caddy.yml b/docker-compose.no-caddy.yml index a55c1427e0..b5e0016d87 100644 --- a/docker-compose.no-caddy.yml +++ b/docker-compose.no-caddy.yml @@ -87,6 +87,8 @@ x-backend-variables: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: BASEROW_EXTRA_ALLOWED_HOSTS: ADDITIONAL_APPS: @@ -243,6 +245,8 @@ services: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: depends_on: - backend networks: diff --git a/docker-compose.yml b/docker-compose.yml index cf0c88634f..7470288671 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -100,6 +100,8 @@ x-backend-variables: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: BASEROW_EXTRA_ALLOWED_HOSTS: ADDITIONAL_APPS: @@ -321,6 +323,8 @@ services: BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS: BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS: + BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES: BASEROW_INTEGRATIONS_PERIODIC_MINUTE_MIN: BASEROW_ENTERPRISE_ASSISTANT_LLM_MODEL: depends_on: