Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ DATABASE_NAME=baserow
# BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS=
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS=
# BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS=
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=
# BASEROW_EXTRA_ALLOWED_HOSTS=
# ADDITIONAL_APPS=
# ADDITIONAL_MODULES=
Expand Down
12 changes: 12 additions & 0 deletions backend/src/baserow/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@
"CELERY_REDBEAT_LOCK_TIMEOUT", CELERY_BEAT_MAX_LOOP_INTERVAL + 60
)

CELERY_RESULT_BACKEND = REDIS_URL
CELERY_RESULT_EXPIRES = int(
# default 1 hour
os.getenv("CELERY_RESULT_EXPIRES") or 3600
)

CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
Expand Down Expand Up @@ -819,6 +825,12 @@ def __setitem__(self, key, value):
AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS", 5)
)
AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS", 30)
)
AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES", 50)
)

TRASH_PAGE_SIZE_LIMIT = 200 # How many trash entries can be requested at once.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class AutomationWorkflowHistoryView(APIView):
}
)
def get(self, request, workflow_id: int):
queryset = AutomationHistoryService().get_workflow_history(
queryset = AutomationHistoryService().get_workflow_histories(
request.user, workflow_id
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,51 @@
from baserow.contrib.automation.data_providers.registries import (
automation_data_provider_type_registry,
)
from baserow.contrib.automation.nodes.models import AutomationActionNode, AutomationNode
from baserow.contrib.automation.history.models import AutomationNodeResult
from baserow.contrib.automation.nodes.models import AutomationActionNode
from baserow.contrib.automation.workflows.models import AutomationWorkflow
from baserow.core.cache import local_cache
from baserow.core.services.dispatch_context import DispatchContext
from baserow.core.services.models import Service
from baserow.core.services.types import DispatchResult
from baserow.core.services.utils import ServiceAdhocRefinements


class AutomationDispatchContext(DispatchContext):
own_properties = ["workflow", "event_payload", "simulate_until_node"]
own_properties = ["workflow", "event_payload", "history_id"]

def __init__(
self,
workflow: AutomationWorkflow,
history_id: int,
event_payload: Optional[Union[Dict, List[Dict]]] = None,
simulate_until_node: Optional[AutomationActionNode] = None,
current_iterations: Optional[Dict[int, int]] = None,
):
"""
The `DispatchContext` implementation for automations. This context is provided
to nodes, and can be modified so that following nodes are aware of a proceeding
node's changes.

:param workflow: The workflow that this dispatch context is associated with.
:param history_id: The AutomationWorkflowHistory ID from which the
workflow's event payload and node results are derived.
:param event_payload: The event data from the trigger node, if any was
provided, as this is optional.
:param simulate_until_node: Stop simulating the dispatch once this node
is reached.
:param current_iterations: Used by the Iterator node's children.
"""

self.workflow = workflow
self.previous_nodes_results: Dict[int, Any] = {}
self.dispatch_history: List[int] = []
self.history_id = history_id
self.simulate_until_node = simulate_until_node
self.current_iterations: Dict[int, int] = {}

if current_iterations:
# The keys are strings due to JSON serialization by Celery. We need
# to convert them back to ints.
self.current_iterations = {int(k): v for k, v in current_iterations.items()}

services = (
[self.simulate_until_node.service.specific]
if self.simulate_until_node
Expand All @@ -59,16 +69,40 @@ def __init__(

def clone(self, **kwargs):
new_context = super().clone(**kwargs)
new_context.previous_nodes_results = {**self.previous_nodes_results}
new_context.current_iterations = {**self.current_iterations}
new_context.dispatch_history = list(self.dispatch_history)

return new_context

def _get_previous_results_cache_key(self) -> Optional[str]:
return f"wa_previous_nodes_results_{self.history_id}"

def _load_previous_results(self) -> Dict[int, Any]:
"""
Returns a dict where keys are the node IDs and values are the results
of the previous_nodes_results.
"""

results = {}
previous_results = AutomationNodeResult.objects.filter(
node_history__workflow_history_id=self.history_id
).select_related("node_history__node")
for result in previous_results:
results[result.node_history.node_id] = result.result

return results

@property
def data_provider_registry(self):
return automation_data_provider_type_registry

@property
def previous_nodes_results(self) -> Dict[int, Any]:
if cache_key := self._get_previous_results_cache_key():
return local_cache.get(
cache_key,
lambda: self._load_previous_results(),
)
return {}

def get_timezone_name(self) -> str:
"""
TODO: Get the timezone from the application settings. For now, returns
Expand All @@ -77,23 +111,6 @@ def get_timezone_name(self) -> str:

return super().get_timezone_name()

def _register_node_result(
self, node: AutomationNode, dispatch_data: Dict[str, Any]
):
self.previous_nodes_results[node.id] = dispatch_data

def after_dispatch(self, node: AutomationNode, dispatch_result: DispatchResult):
"""
This method is called after each node dispatch. It can be used
to perform any final actions or cleanup.
"""

self.dispatch_history.append(node.id)
self._register_node_result(node, dispatch_result.data)

def set_current_iteration(self, node, index):
self.current_iterations[node.id] = index

def range(self, service: Service):
return [0, None]

Expand Down
17 changes: 17 additions & 0 deletions backend/src/baserow/contrib/automation/history/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from baserow.contrib.automation.exceptions import AutomationError


class AutomationWorkflowHistoryError(AutomationError):
pass


class AutomationWorkflowHistoryDoesNotExist(AutomationWorkflowHistoryError):
"""When the history entry doesn't exist."""

def __init__(self, history_id=None, *args, **kwargs):
self.history_id = history_id
super().__init__(
f"The automation workflow history {history_id} does not exist.",
*args,
**kwargs,
)
87 changes: 81 additions & 6 deletions backend/src/baserow/contrib/automation/history/handler.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,117 @@
from datetime import datetime
from typing import Optional
from typing import Dict, List, Optional, Union

from django.db.models import QuerySet

from baserow.contrib.automation.history.constants import HistoryStatusChoices
from baserow.contrib.automation.history.models import AutomationWorkflowHistory
from baserow.contrib.automation.history.exceptions import (
AutomationWorkflowHistoryDoesNotExist,
)
from baserow.contrib.automation.history.models import (
AutomationNodeHistory,
AutomationNodeResult,
AutomationWorkflowHistory,
)
from baserow.contrib.automation.nodes.models import AutomationNode
from baserow.contrib.automation.workflows.models import AutomationWorkflow


class AutomationHistoryHandler:
def get_workflow_history(
def get_workflow_histories(
self, workflow: AutomationWorkflow, base_queryset: Optional[QuerySet] = None
) -> QuerySet[AutomationWorkflowHistory]:
"""
Returns all the AutomationWorkflowHistory related to the provided workflow.

Excludes any simulation histories that haven't yet been deleted.
"""

if base_queryset is None:
base_queryset = AutomationWorkflowHistory.objects.all()

return base_queryset.filter(workflow=workflow).prefetch_related(
"workflow__automation__workspace"
)
return base_queryset.filter(
workflow=workflow,
simulate_until_node__isnull=True,
).prefetch_related("workflow__automation__workspace")

def get_workflow_history(
self, history_id: int, base_queryset: Optional[QuerySet] = None
) -> AutomationWorkflowHistory:
"""
Returns a AutomationWorkflowHistory by its ID.

:param history_id: The ID of the AutomationWorkflowHistory.
:param base_queryset: Can be provided to already filter or apply performance
improvements to the queryset when it's being executed.
:raises AutomationWorkflowHistoryDoesNotExist: If the history doesn't exist.
:return: The model instance of the AutomationWorkflowHistory
"""

if base_queryset is None:
base_queryset = AutomationWorkflowHistory.objects.all()

try:
return base_queryset.select_related("workflow__automation__workspace").get(
id=history_id
)
except AutomationWorkflowHistory.DoesNotExist:
raise AutomationWorkflowHistoryDoesNotExist(history_id)

def create_workflow_history(
self,
workflow: AutomationWorkflow,
started_on: datetime,
is_test_run: bool,
event_payload: Optional[Union[Dict, List[Dict]]] = None,
simulate_until_node: Optional[AutomationNode] = None,
status: HistoryStatusChoices = HistoryStatusChoices.STARTED,
completed_on: Optional[datetime] = None,
message: str = "",
) -> AutomationWorkflowHistory:
"""Creates a history entry for a Workflow run."""

return AutomationWorkflowHistory.objects.create(
workflow=workflow,
started_on=started_on,
is_test_run=is_test_run,
simulate_until_node=simulate_until_node,
event_payload=event_payload,
status=status,
completed_on=completed_on,
message=message,
)

def create_node_history(
self,
workflow_history: AutomationWorkflowHistory,
node: AutomationNode,
started_on: datetime,
status: HistoryStatusChoices = HistoryStatusChoices.STARTED,
completed_on: Optional[datetime] = None,
message: str = "",
) -> AutomationNodeHistory:
"""Creates a history entry for a Node dispatch."""

return AutomationNodeHistory.objects.create(
workflow_history=workflow_history,
node=node,
started_on=started_on,
status=status,
completed_on=completed_on,
message=message,
)

def create_node_result(
self,
node_history: AutomationNodeHistory,
result: Optional[Union[Dict, List[Dict]]] = None,
iteration: int = 0,
) -> AutomationNodeResult:
"""Saves the result of a Node dispatch."""

result = result if result else {}
return AutomationNodeResult.objects.create(
node_history=node_history,
iteration=iteration,
result=result,
)
64 changes: 62 additions & 2 deletions backend/src/baserow/contrib/automation/history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ class AutomationHistory(models.Model):

message = models.TextField()

is_test_run = models.BooleanField()
is_test_run = models.BooleanField(
db_default=False
) # TODO ZDM: Remove after next release

status = models.CharField(
choices=HistoryStatusChoices.choices,
Expand All @@ -25,5 +27,63 @@ class AutomationWorkflowHistory(AutomationHistory):
workflow = models.ForeignKey(
"automation.AutomationWorkflow",
on_delete=models.CASCADE,
related_name="workflow_history",
related_name="workflow_histories",
)
simulate_until_node = models.ForeignKey(
"automation.AutomationNode",
on_delete=models.CASCADE,
null=True,
blank=True,
related_name="simulation_histories",
)

is_test_run = models.BooleanField(
db_default=False,
help_text="True when the workflow is being simulated.",
)

event_payload = models.JSONField(
db_default=None,
null=True,
blank=True,
help_text="Event payload received by the workflow.",
)


class AutomationNodeHistory(AutomationHistory):
workflow_history = models.ForeignKey(
"automation.AutomationWorkflowHistory",
on_delete=models.CASCADE,
related_name="node_histories",
)
node = models.ForeignKey(
"automation.AutomationNode",
on_delete=models.CASCADE,
related_name="node_histories",
)

class Meta:
indexes = [
models.Index(fields=["workflow_history", "node"]),
]


class AutomationNodeResult(models.Model):
node_history = models.ForeignKey(
"automation.AutomationNodeHistory",
on_delete=models.CASCADE,
related_name="node_results",
)

iteration = models.PositiveIntegerField(
db_default=0,
help_text="Keeps track of the current iteration of the Iterator node.",
)

result = models.JSONField(
db_default={},
help_text="Contains node results.",
)

class Meta:
unique_together = [["node_history", "iteration"]]
Loading
Loading