diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 65adf98f472181..1cb1cc8063e58d 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -896,6 +896,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "sentry.preprod.vcs.pr_comments.tasks", "sentry.preprod.vcs.status_checks.size.tasks", "sentry.preprod.vcs.status_checks.snapshots.tasks", + "sentry.processing_errors.tasks", "sentry.profiles.task", "sentry.release_health.tasks", "sentry.relocation.tasks.process", @@ -994,6 +995,10 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "task": "workflow_engine:sentry.workflow_engine.tasks.workflows.schedule_delayed_workflows", "schedule": timedelta(seconds=15), }, + "resolve-stale-sourcemap-detectors": { + "task": "workflow_engine:sentry.processing_errors.tasks.resolve_stale_sourcemap_detectors", + "schedule": task_crontab("*/5", "*", "*", "*", "*"), + }, "sync-options": { "task": "options:sentry.tasks.options.sync_options", "schedule": timedelta(seconds=10), diff --git a/src/sentry/processing_errors/tasks.py b/src/sentry/processing_errors/tasks.py new file mode 100644 index 00000000000000..c16135790de526 --- /dev/null +++ b/src/sentry/processing_errors/tasks.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import logging +from datetime import timedelta + +from django.utils import timezone + +from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka +from sentry.issues.status_change_message import StatusChangeMessage +from sentry.models.group import GroupStatus +from sentry.processing_errors.detection import _redis_key_triggered +from sentry.processing_errors.grouptype import SourcemapConfigurationType +from sentry.silo.base import SiloMode +from sentry.tasks.base import instrumented_task +from sentry.taskworker.namespaces import workflow_engine_tasks +from sentry.utils import metrics +from sentry.workflow_engine.handlers.detector.stateful import ( + StatefulDetectorHandler, + get_redis_client, +) +from sentry.workflow_engine.models import DetectorState +from sentry.workflow_engine.types import DetectorPriorityLevel + +logger = logging.getLogger(__name__) + +# Detectors with no new failures for this long are resolved. +# Must be >> REFRESH_INTERVAL_SECONDS (5 min) in detection.py. +STALENESS_THRESHOLD_MINUTES = 30 + +# Ignore rows older than this to avoid scanning unrelated data. +MAX_AGE_DAYS = 1 + + +@instrumented_task( + name="sentry.processing_errors.tasks.resolve_stale_sourcemap_detectors", + namespace=workflow_engine_tasks, + silo_mode=SiloMode.REGION, +) +def resolve_stale_sourcemap_detectors() -> None: + """ + Periodic task that resolves sourcemap configuration detectors + whose date_updated is older than the staleness threshold, + indicating that the sourcemap issue is no longer occurring. + """ + now = timezone.now() + stale_states = DetectorState.objects.filter( + detector__type=SourcemapConfigurationType.slug, + is_triggered=True, + date_updated__lt=now - timedelta(minutes=STALENESS_THRESHOLD_MINUTES), + date_updated__gt=now - timedelta(days=MAX_AGE_DAYS), + ).select_related("detector") + + for state in stale_states: + _resolve_detector(state) + + +def _resolve_detector(state: DetectorState) -> None: + """ + Atomically resolve a single triggered DetectorState and produce + a StatusChangeMessage so the issue platform marks it resolved. + """ + now = timezone.now() + + rows = DetectorState.objects.filter( + id=state.id, + is_triggered=True, + date_updated__lt=now, + ).update( + is_triggered=False, + state=DetectorPriorityLevel.OK, + date_updated=now, + ) + + if not rows: + return + + handler = state.detector.detector_handler + if not isinstance(handler, StatefulDetectorHandler): + logger.error("No handler for detector %s", state.detector.id) + return + + fingerprint = [ + *handler.build_issue_fingerprint(), + handler.state_manager.build_key(None), + ] + + status_change = StatusChangeMessage( + fingerprint=fingerprint, + project_id=state.detector.project_id, + new_status=GroupStatus.RESOLVED, + new_substatus=None, + detector_id=state.detector.id, + ) + + produce_occurrence_to_kafka( + payload_type=PayloadType.STATUS_CHANGE, + status_change=status_change, + ) + + # Clear Redis triggered cache so detection can re-trigger if the problem returns + get_redis_client().delete(_redis_key_triggered(state.detector.project_id)) + + metrics.incr("processing_errors.sourcemap_detector.resolved") diff --git a/tests/sentry/processing_errors/test_tasks.py b/tests/sentry/processing_errors/test_tasks.py new file mode 100644 index 00000000000000..3e88c25c245d25 --- /dev/null +++ b/tests/sentry/processing_errors/test_tasks.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import patch + +from django.utils import timezone + +from sentry.processing_errors.detection import _redis_key_triggered +from sentry.processing_errors.grouptype import SourcemapCheckStatus, SourcemapConfigurationType +from sentry.processing_errors.tasks import ( + STALENESS_THRESHOLD_MINUTES, + resolve_stale_sourcemap_detectors, +) +from sentry.testutils.cases import TestCase +from sentry.workflow_engine.handlers.detector.stateful import get_redis_client +from sentry.workflow_engine.models import DataConditionGroup, Detector, DetectorState +from sentry.workflow_engine.models.data_condition import Condition, DataCondition +from sentry.workflow_engine.types import DetectorPriorityLevel + + +class TestResolveStaleSourcemapDetectors(TestCase): + def _create_triggered_detector(self, date_updated=None): + """Create a detector with a triggered DetectorState.""" + condition_group = DataConditionGroup.objects.create( + logic_type=DataConditionGroup.Type.ANY, + organization_id=self.project.organization_id, + ) + + DataCondition.objects.create( + comparison=SourcemapCheckStatus.FAILURE, + type=Condition.EQUAL, + condition_result=DetectorPriorityLevel.HIGH, + condition_group=condition_group, + ) + + DataCondition.objects.create( + comparison=SourcemapCheckStatus.SUCCESS, + type=Condition.EQUAL, + condition_result=DetectorPriorityLevel.OK, + condition_group=condition_group, + ) + + detector = Detector.objects.create( + type=SourcemapConfigurationType.slug, + project=self.project, + name="Sourcemap Configuration", + config={}, + workflow_condition_group=condition_group, + ) + + state = DetectorState.objects.create( + detector=detector, + detector_group_key=None, + is_triggered=True, + state=DetectorPriorityLevel.HIGH, + ) + + if date_updated is not None: + DetectorState.objects.filter(id=state.id).update(date_updated=date_updated) + state.refresh_from_db() + + get_redis_client().set(_redis_key_triggered(self.project.id), "1", ex=3600) + + return detector, state + + @patch("sentry.processing_errors.tasks.produce_occurrence_to_kafka") + def test_resolves_stale_detector(self, mock_produce) -> None: + stale_time = timezone.now() - timedelta(minutes=STALENESS_THRESHOLD_MINUTES + 5) + detector, state = self._create_triggered_detector(date_updated=stale_time) + resolve_stale_sourcemap_detectors() + state.refresh_from_db() + assert state.is_triggered is False + assert state.state == str(DetectorPriorityLevel.OK) + assert get_redis_client().get(_redis_key_triggered(self.project.id)) is None + mock_produce.assert_called_once() + call_kwargs = mock_produce.call_args[1] + assert call_kwargs["status_change"].project_id == self.project.id + assert call_kwargs["status_change"].fingerprint == [ + f"{self.project.id}:sourcemap", + f"detector:{detector.id}", + ] + + @patch("sentry.processing_errors.tasks.produce_occurrence_to_kafka") + def test_skips_recently_updated(self, mock_produce) -> None: + _detector, state = self._create_triggered_detector() + resolve_stale_sourcemap_detectors() + state.refresh_from_db() + assert state.is_triggered is True + mock_produce.assert_not_called() + + @patch("sentry.processing_errors.tasks.produce_occurrence_to_kafka") + def test_skips_already_resolved(self, mock_produce) -> None: + stale_time = timezone.now() - timedelta(minutes=STALENESS_THRESHOLD_MINUTES + 5) + _detector, state = self._create_triggered_detector(date_updated=stale_time) + + DetectorState.objects.filter(id=state.id).update( + is_triggered=False, + state=DetectorPriorityLevel.OK, + ) + + resolve_stale_sourcemap_detectors() + mock_produce.assert_not_called()