Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions backend/src/baserow/contrib/automation/nodes/node_types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, Iterable, Optional

from django.contrib.auth.models import AbstractUser
from django.db import router
from django.db.models import Q, QuerySet
from django.db.models import Q
from django.utils import timezone
from django.utils.translation import gettext as _

Expand Down Expand Up @@ -340,8 +340,8 @@ def before_move(

def on_event(
self,
services: QuerySet[Service],
event_payload: List[Dict] | None | Callable = None,
services: Iterable[Service],
event_payload: Dict | None | Callable = None,
user: Optional[AbstractUser] = None,
):
from baserow.contrib.automation.workflows.handler import (
Expand All @@ -363,12 +363,16 @@ def on_event(
.select_related("workflow__automation__workspace")
)

if triggers and callable(event_payload):
event_payload = event_payload()
# For perf reasons, store the trigger<->service relationship.
service_map = {service.id: service for service in services}

for trigger in triggers:
workflow = trigger.workflow
# If we've received a callable payload, call it with the specific service,
# this can give us a payload that is specific to the trigger's service.
if callable(event_payload):
event_payload = event_payload(service_map[trigger.service_id])

workflow = trigger.workflow
AutomationWorkflowHandler().async_start_workflow(
workflow,
event_payload,
Expand Down
7 changes: 7 additions & 0 deletions backend/src/baserow/contrib/integrations/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ class CorePeriodicService(Service):
help_text="Timestamp when the service was last executed periodically. This "
"value is used to calculate when it should be run.",
)
next_run_at = models.DateTimeField(
null=True,
db_index=True,
db_default=None,
help_text="The next scheduled time for this service to run. Automatically "
"calculated based on the interval and schedule fields.",
)
interval = models.CharField(
max_length=10,
choices=PERIODIC_INTERVAL_CHOICES,
Expand Down
212 changes: 118 additions & 94 deletions backend/src/baserow/contrib/integrations/core/service_types.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
import json
import socket
import uuid
from datetime import datetime, timedelta
from datetime import datetime
from smtplib import SMTPAuthenticationError, SMTPConnectError, SMTPNotSupportedError
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple

from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.mail import EmailMultiAlternatives, get_connection
from django.db import router
from django.db.models import DurationField, ExpressionWrapper, F, Q, Value
from django.db.models.functions import Coalesce, NullIf
from django.db.models import Q, QuerySet
from django.urls import path
from django.utils import timezone
from django.utils.translation import gettext as _

from dateutil.relativedelta import relativedelta
from genson import SchemaBuilder
from loguru import logger
from requests import exceptions as request_exceptions
Expand All @@ -31,11 +29,7 @@
BODY_TYPE,
HTTP_METHOD,
PERIODIC_INTERVAL_CHOICES,
PERIODIC_INTERVAL_DAY,
PERIODIC_INTERVAL_HOUR,
PERIODIC_INTERVAL_MINUTE,
PERIODIC_INTERVAL_MONTH,
PERIODIC_INTERVAL_WEEK,
)
from baserow.contrib.integrations.core.exceptions import (
CoreHTTPTriggerServiceDoesNotExist,
Expand All @@ -54,6 +48,7 @@
HTTPHeader,
HTTPQueryParam,
)
from baserow.contrib.integrations.core.utils import calculate_next_periodic_run
from baserow.contrib.integrations.utils import get_http_request_function
from baserow.core.formula.types import BaserowFormulaObject
from baserow.core.formula.validator import (
Expand Down Expand Up @@ -1172,6 +1167,7 @@ class CorePeriodicServiceType(TriggerServiceTypeMixin, CoreServiceType):
"hour",
"day_of_week",
"day_of_month",
"next_run_at",
]

serializer_field_overrides = {
Expand Down Expand Up @@ -1219,6 +1215,7 @@ class SerializedDict(ServiceDict):
hour: int
day_of_week: int
day_of_month: int
next_run_at: datetime

def prepare_values(
self,
Expand Down Expand Up @@ -1279,16 +1276,33 @@ def stop_listening(self):
super().stop_listening()
self._cancel_periodic_task()

def _get_payload(self, now=None):
now = now if now else timezone.now()
return {"triggered_at": now.isoformat()}
def _get_dispatch_payload(self, service: CorePeriodicService) -> Dict[str, str]:
return {
"triggered_at": service.last_periodic_run.isoformat(),
"next_run_at": service.next_run_at.isoformat(),
}

def _get_simulation_payload(self, service: CorePeriodicService) -> Dict[str, str]:
now = timezone.now().replace(second=0, microsecond=0)
next_run = calculate_next_periodic_run(
interval=service.interval,
minute=service.minute,
hour=service.hour,
day_of_week=service.day_of_week,
day_of_month=service.day_of_month,
from_time=now,
)
return {
"triggered_at": now.isoformat(),
"next_run_at": next_run.isoformat(),
}

def dispatch_data(
self,
service: CorePeriodicService,
resolved_values: Dict[str, Any],
dispatch_context: DispatchContext,
) -> None:
) -> Dict[str, str]:
"""
Responsible for dispatching a single periodic service. In practice we
dispatch all periodic services that are due in one go so this method just
Expand All @@ -1302,90 +1316,29 @@ def dispatch_data(
if dispatch_context.event_payload is not None:
return dispatch_context.event_payload

return self._get_payload()
return self._get_simulation_payload(service)

def call_periodic_services_that_are_due(self, now: datetime):
def get_periodic_services_that_are_due(
self, current: datetime = None
) -> QuerySet[CorePeriodicService]:
"""
Responsible for finding all periodic services that are due to run and
calling the `on_event` callback with them.
Responsible for fetching all periodic services which are due to be run at the
given time. It also locks these services to prevent multiple workers from
dispatching the same service.

:param now: The current datetime.
:param current: The current time to compare the `next_run_at` field to.
If not provided, it will default to the current time.
:return: A queryset of `CorePeriodicService` instances which are due to be run.
"""

# Truncate to minute precision for consistent interval calculations
# Note: we replace the seconds and microseconds due to jitter that
# can exist between when the Celery task is scheduled, and when the
# services were actually processed. For example:
#
# Assuming `interval=minute` and `minute=2` (i.e. run every two minutes):
# - Celery runs at 12:00:00.500 (half a second delay)
# - Service is triggered, last_periodic_run = 12:00:00.500
# Next checks:
# - At 12:01:00.400: Is 12:00:00.500 <= 11:59:00.400? NO
# - At 12:02:00.300: Is 12:00:00.500 <= 12:00:00.300? NO (500ms > 300ms!)
# - At 12:03:00.200: Is 12:00:00.500 <= 12:01:00.200? YES
now = now.replace(second=0, microsecond=0)

query_conditions = Q()
is_null = Q(last_periodic_run__isnull=True)

# MINUTE
minute_condition = Q(
is_null
| Q(
last_periodic_run__lte=now
- ExpressionWrapper(
Coalesce(NullIf(F("minute"), 0), Value(1)) * timedelta(minutes=1),
output_field=DurationField(),
)
),
interval=PERIODIC_INTERVAL_MINUTE,
)
query_conditions |= minute_condition

# HOUR
hour_ago = now - timedelta(hours=1)
hour_condition = Q(
is_null | Q(last_periodic_run__lt=hour_ago),
interval=PERIODIC_INTERVAL_HOUR,
minute__lte=now.minute,
)
query_conditions |= hour_condition

# DAY
day_ago = now - timedelta(days=1)
day_condition = Q(
is_null | Q(last_periodic_run__lt=day_ago),
interval=PERIODIC_INTERVAL_DAY,
hour__lte=now.hour,
minute__lte=now.minute,
)
query_conditions |= day_condition

# WEEK
week_ago = now - timedelta(weeks=1)
week_condition = Q(
is_null | Q(last_periodic_run__lt=week_ago),
interval=PERIODIC_INTERVAL_WEEK,
day_of_week=now.weekday(),
hour__lte=now.hour,
minute__lte=now.minute,
)
query_conditions |= week_condition

# MONTH
month_ago = now - relativedelta(months=1)
month_condition = Q(
is_null | Q(last_periodic_run__lt=month_ago),
interval=PERIODIC_INTERVAL_MONTH,
day_of_month=now.day,
hour__lte=now.hour,
minute__lte=now.minute,
)
query_conditions |= month_condition
# If we haven't been given the current time, get it.
current = current or timezone.now()

periodic_services = (
CorePeriodicService.objects.filter(query_conditions)
return (
CorePeriodicService.objects.filter(
Q(next_run_at__lte=current.replace(second=0, microsecond=0))
| Q(next_run_at__isnull=True)
)
.select_for_update(
of=("self",),
skip_locked=True,
Expand All @@ -1394,12 +1347,79 @@ def call_periodic_services_that_are_due(self, now: datetime):
.order_by("id")
)

def call_periodic_services_that_are_due(self):
"""
Responsible for finding all periodic services that are due. This will likely
result in services which are due, but not dispatchable, it is up to the parent
instance (e.g. automation trigger) to determine if the due service is *also*
dispatchable (e.g. a trigger wants to know if the workflow is published).

Only services which were dispatched will be included in the bulk-update, where
the two date fields are refreshed for their next run.
"""

# Truncate to minute precision for consistent comparisons
now = timezone.now().replace(second=0, microsecond=0)

# Determine which services are due to be run. This is not an exhaustive list,
# it's possible only a subset of these will actually be dispatched.
periodic_services_due = self.get_periodic_services_that_are_due(now)

# This list will contain the definitive list of services that were marked
# for dispatching by the parent, and which we will update the `next_run_at` and
# `last_periodic_run` fields for.
periodic_services_dispatched = []

def _get_service_payload(
dispatched_service: CorePeriodicService,
) -> Dict[str, str]:
"""
Responsible for returning this service's specific payload,
and also creating a list of our due services which were *also*
deemed to be dispatchable by the parent.

:param dispatched_service: The service which will be dispatched.
:return: The payload to dispatch for this service.
"""

# Calculate next run from the current `next_run_at` (not from 'now').
# This prevents drift even if the service runs late.
# If the service ran *very* late (e.g. it was scheduled, but the server
# was down, and we didn't run any services), keep advancing until we get
# a future time. For example:
# - A service is scheduled to run every minute.
# The next run is scheduled for 10:00:00.
# - The server is down, and we only restart it at 10:05:00.
# - When we restart, we run the service (as 10:00:00 <= 10:05:00).
# - The next run would technically be scheduled for 10:01:00, but that time
# is also in the past. So we keep advancing until we find a future time
# (10:06:00 in this case).
next_run = dispatched_service.next_run_at or now
while next_run <= now:
next_run = calculate_next_periodic_run(
interval=dispatched_service.interval,
minute=dispatched_service.minute,
hour=dispatched_service.hour,
day_of_week=dispatched_service.day_of_week,
day_of_month=dispatched_service.day_of_month,
from_time=next_run,
)

dispatched_service.next_run_at = next_run
dispatched_service.last_periodic_run = now

periodic_services_dispatched.append(dispatched_service)
return self._get_dispatch_payload(dispatched_service)

self.on_event(
periodic_services,
self._get_payload(now),
periodic_services_due,
_get_service_payload,
)

periodic_services.update(last_periodic_run=now)
if periodic_services_dispatched:
CorePeriodicService.objects.bulk_update(
periodic_services_dispatched, ["next_run_at", "last_periodic_run"]
)

def get_schema_name(self, service: CorePeriodicService) -> str:
return f"Periodic{service.id}Schema"
Expand All @@ -1415,7 +1435,11 @@ def generate_schema(
"properties": {
"triggered_at": {
"type": "string",
"title": _("Triggered at"),
"title": _("Previous scheduled run"),
},
"next_run_at": {
"type": "string",
"title": _("Next scheduled run"),
},
},
}
Expand Down
Loading
Loading