Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.preprod.vcs.pr_comments.tasks",
"sentry.preprod.vcs.status_checks.size.tasks",
"sentry.preprod.vcs.status_checks.snapshots.tasks",
"sentry.processing_errors.tasks",
"sentry.profiles.task",
"sentry.release_health.tasks",
"sentry.relocation.tasks.process",
Expand Down Expand Up @@ -994,6 +995,10 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"task": "workflow_engine:sentry.workflow_engine.tasks.workflows.schedule_delayed_workflows",
"schedule": timedelta(seconds=15),
},
"resolve-stale-sourcemap-detectors": {
"task": "workflow_engine:sentry.processing_errors.tasks.resolve_stale_sourcemap_detectors",
"schedule": task_crontab("*/5", "*", "*", "*", "*"),
},
"sync-options": {
"task": "options:sentry.tasks.options.sync_options",
"schedule": timedelta(seconds=10),
Expand Down
103 changes: 103 additions & 0 deletions src/sentry/processing_errors/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

import logging
from datetime import timedelta

from django.utils import timezone

from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.issues.status_change_message import StatusChangeMessage
from sentry.models.group import GroupStatus
from sentry.processing_errors.detection import _redis_key_triggered
from sentry.processing_errors.grouptype import SourcemapConfigurationType
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import workflow_engine_tasks
from sentry.utils import metrics
from sentry.workflow_engine.handlers.detector.stateful import (
StatefulDetectorHandler,
get_redis_client,
)
from sentry.workflow_engine.models import DetectorState
from sentry.workflow_engine.types import DetectorPriorityLevel

logger = logging.getLogger(__name__)

# Detectors with no new failures for this long are resolved.
# Must be >> REFRESH_INTERVAL_SECONDS (5 min) in detection.py.
STALENESS_THRESHOLD_MINUTES = 30

# Ignore rows older than this to avoid scanning unrelated data.
MAX_AGE_DAYS = 1


@instrumented_task(
name="sentry.processing_errors.tasks.resolve_stale_sourcemap_detectors",
namespace=workflow_engine_tasks,
silo_mode=SiloMode.REGION,
)
def resolve_stale_sourcemap_detectors() -> None:
"""
Periodic task that resolves sourcemap configuration detectors
whose date_updated is older than the staleness threshold,
indicating that the sourcemap issue is no longer occurring.
"""
now = timezone.now()
stale_states = DetectorState.objects.filter(
detector__type=SourcemapConfigurationType.slug,
is_triggered=True,
date_updated__lt=now - timedelta(minutes=STALENESS_THRESHOLD_MINUTES),
date_updated__gt=now - timedelta(days=MAX_AGE_DAYS),
).select_related("detector")

for state in stale_states:
_resolve_detector(state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many are you expecting here? Wondering if this should be a fan'd out task?

Copy link
Member Author

@wedamija wedamija Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we'd have 10s to 100s at most. Most of the time, either the detector will be active because more errors are coming in, or totally inactive because they have sourcemaps set up right. I think we're better off starting out simple here and adding fan out if we find there's a problem.



def _resolve_detector(state: DetectorState) -> None:
"""
Atomically resolve a single triggered DetectorState and produce
a StatusChangeMessage so the issue platform marks it resolved.
"""
now = timezone.now()

rows = DetectorState.objects.filter(
id=state.id,
is_triggered=True,
date_updated__lt=now,
).update(
is_triggered=False,
state=DetectorPriorityLevel.OK,
date_updated=now,
)

if not rows:
return

handler = state.detector.detector_handler
if not isinstance(handler, StatefulDetectorHandler):
logger.error("No handler for detector %s", state.detector.id)
return

fingerprint = [
*handler.build_issue_fingerprint(),
handler.state_manager.build_key(None),
]

status_change = StatusChangeMessage(
fingerprint=fingerprint,
project_id=state.detector.project_id,
new_status=GroupStatus.RESOLVED,
new_substatus=None,
detector_id=state.detector.id,
)

produce_occurrence_to_kafka(
payload_type=PayloadType.STATUS_CHANGE,
status_change=status_change,
)

# Clear Redis triggered cache so detection can re-trigger if the problem returns
get_redis_client().delete(_redis_key_triggered(state.detector.project_id))

metrics.incr("processing_errors.sourcemap_detector.resolved")
102 changes: 102 additions & 0 deletions tests/sentry/processing_errors/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from __future__ import annotations

from datetime import timedelta
from unittest.mock import patch

from django.utils import timezone

from sentry.processing_errors.detection import _redis_key_triggered
from sentry.processing_errors.grouptype import SourcemapCheckStatus, SourcemapConfigurationType
from sentry.processing_errors.tasks import (
STALENESS_THRESHOLD_MINUTES,
resolve_stale_sourcemap_detectors,
)
from sentry.testutils.cases import TestCase
from sentry.workflow_engine.handlers.detector.stateful import get_redis_client
from sentry.workflow_engine.models import DataConditionGroup, Detector, DetectorState
from sentry.workflow_engine.models.data_condition import Condition, DataCondition
from sentry.workflow_engine.types import DetectorPriorityLevel


class TestResolveStaleSourcemapDetectors(TestCase):
def _create_triggered_detector(self, date_updated=None):
"""Create a detector with a triggered DetectorState."""
condition_group = DataConditionGroup.objects.create(
logic_type=DataConditionGroup.Type.ANY,
organization_id=self.project.organization_id,
)

DataCondition.objects.create(
comparison=SourcemapCheckStatus.FAILURE,
type=Condition.EQUAL,
condition_result=DetectorPriorityLevel.HIGH,
condition_group=condition_group,
)

DataCondition.objects.create(
comparison=SourcemapCheckStatus.SUCCESS,
type=Condition.EQUAL,
condition_result=DetectorPriorityLevel.OK,
condition_group=condition_group,
)

detector = Detector.objects.create(
type=SourcemapConfigurationType.slug,
project=self.project,
name="Sourcemap Configuration",
config={},
workflow_condition_group=condition_group,
)

state = DetectorState.objects.create(
detector=detector,
detector_group_key=None,
is_triggered=True,
state=DetectorPriorityLevel.HIGH,
)

if date_updated is not None:
DetectorState.objects.filter(id=state.id).update(date_updated=date_updated)
state.refresh_from_db()

get_redis_client().set(_redis_key_triggered(self.project.id), "1", ex=3600)

return detector, state

@patch("sentry.processing_errors.tasks.produce_occurrence_to_kafka")
def test_resolves_stale_detector(self, mock_produce) -> None:
stale_time = timezone.now() - timedelta(minutes=STALENESS_THRESHOLD_MINUTES + 5)
detector, state = self._create_triggered_detector(date_updated=stale_time)
resolve_stale_sourcemap_detectors()
state.refresh_from_db()
assert state.is_triggered is False
assert state.state == str(DetectorPriorityLevel.OK)
assert get_redis_client().get(_redis_key_triggered(self.project.id)) is None
mock_produce.assert_called_once()
call_kwargs = mock_produce.call_args[1]
assert call_kwargs["status_change"].project_id == self.project.id
assert call_kwargs["status_change"].fingerprint == [
f"{self.project.id}:sourcemap",
f"detector:{detector.id}",
]

@patch("sentry.processing_errors.tasks.produce_occurrence_to_kafka")
def test_skips_recently_updated(self, mock_produce) -> None:
_detector, state = self._create_triggered_detector()
resolve_stale_sourcemap_detectors()
state.refresh_from_db()
assert state.is_triggered is True
mock_produce.assert_not_called()

@patch("sentry.processing_errors.tasks.produce_occurrence_to_kafka")
def test_skips_already_resolved(self, mock_produce) -> None:
stale_time = timezone.now() - timedelta(minutes=STALENESS_THRESHOLD_MINUTES + 5)
_detector, state = self._create_triggered_detector(date_updated=stale_time)

DetectorState.objects.filter(id=state.id).update(
is_triggered=False,
state=DetectorPriorityLevel.OK,
)

resolve_stale_sourcemap_detectors()
mock_produce.assert_not_called()
Loading