Skip to content

Commit 6bee330

Browse files
MU-SoftwareCopilot
andcommitted
feat: Celery 통합 및 작업 큐 설정 추가
Co-authored-by: Copilot <copilot@github.com>
1 parent c34d573 commit 6bee330

14 files changed

Lines changed: 325 additions & 27 deletions

File tree

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ local-setup:
4444
local-api: local-collectstatic
4545
@ENV_PATH=envfile/.env.local uv run python app/manage.py runserver 8000
4646

47+
# Run local Celery worker (requires `make docker-compose-up` for redis)
48+
local-worker:
49+
@cd app && ENV_PATH=../envfile/.env.local uv run celery -A core worker -l INFO --concurrency=4
50+
4751
# Run django collectstatic
4852
local-collectstatic:
4953
@ENV_PATH=envfile/.env.local uv run python app/manage.py collectstatic --noinput

app/admin_api/test/notification_test.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def test_kakao_template_delete_is_405(api_client, kakao_template):
179179
# ---- History create (POST /history/) ----------------------------------------
180180

181181

182-
@pytest.mark.django_db
182+
@pytest.mark.django_db(transaction=True)
183183
def test_create_history_via_template_creates_sent_to_and_sends(api_client, sms_template):
184184
with patch("notification.models.nhn_cloud_sms.NHNCloudSMSNotificationHistory.client"):
185185
response = api_client.post(
@@ -200,7 +200,7 @@ def test_create_history_via_template_creates_sent_to_and_sends(api_client, sms_t
200200
assert sent_to["status"] == NotificationStatus.SENT
201201

202202

203-
@pytest.mark.django_db
203+
@pytest.mark.django_db(transaction=True)
204204
def test_create_history_marks_sent_to_failed_when_send_raises(api_client, sms_template):
205205
with patch("notification.models.nhn_cloud_sms.NHNCloudSMSNotificationHistory.client") as mock_client:
206206
mock_client.send_message.side_effect = RuntimeError("external api down")
@@ -218,7 +218,7 @@ def test_create_history_marks_sent_to_failed_when_send_raises(api_client, sms_te
218218
assert body["sent_to_list"][0]["status"] == NotificationStatus.FAILED
219219

220220

221-
@pytest.mark.django_db
221+
@pytest.mark.django_db(transaction=True)
222222
def test_create_history_with_multiple_recipients_and_per_recipient_context(api_client, sms_template):
223223
# 한 번의 요청으로 여러 수신자에게 서로 다른 context로 발송, 결과는 같은 history로 묶여 조회됨.
224224
with patch("notification.models.nhn_cloud_sms.NHNCloudSMSNotificationHistory.client") as mock_client:
@@ -240,7 +240,7 @@ def test_create_history_with_multiple_recipients_and_per_recipient_context(api_c
240240
assert {s["recipient"] for s in body["sent_to_list"]} == {"01000000001", "01000000002"}
241241

242242

243-
@pytest.mark.django_db
243+
@pytest.mark.django_db(transaction=True)
244244
def test_create_history_templateless_email(api_client):
245245
# 템플릿 없이 template_data + sent_from 직접 입력해 발송.
246246
with patch("notification.models.email.EmailNotificationHistory.client"):
@@ -306,7 +306,7 @@ def test_history_list_filter_by_template(api_client, email_template, superuser):
306306
assert ids == [str(matching.id)]
307307

308308

309-
@pytest.mark.django_db
309+
@pytest.mark.django_db(transaction=True)
310310
def test_history_list_filter_by_created_by_username(api_client, email_template, superuser):
311311
# API 경로로 history를 만들어야 BaseAbstractModelQuerySet.create()의 get_current_user()가
312312
# 인증된 superuser를 created_by로 잡음.
@@ -330,7 +330,7 @@ def test_history_list_filter_by_created_by_username(api_client, email_template,
330330
# ---- History Retry ----------------------------------------------------------
331331

332332

333-
@pytest.mark.django_db
333+
@pytest.mark.django_db(transaction=True)
334334
def test_retry_resends_only_failed_sent_to(api_client, email_history):
335335
# 한 history 안에 SENT/FAILED가 섞여 있을 때 retry는 FAILED만 재시도.
336336
extra = EmailNotificationHistory.objects.create_for_recipients(
@@ -352,7 +352,7 @@ def test_retry_resends_only_failed_sent_to(api_client, email_history):
352352
assert email_history.sent_to_list.get().status == NotificationStatus.SENT
353353

354354

355-
@pytest.mark.django_db
355+
@pytest.mark.django_db(transaction=True)
356356
def test_retry_keeps_sent_to_failed_when_send_raises(api_client, email_history):
357357
email_history.sent_to_list.update(status=NotificationStatus.FAILED)
358358

app/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from django.test import override_settings
2+
from pytest import fixture
3+
4+
5+
@fixture(autouse=True)
6+
def _celery_eager():
7+
with override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=False):
8+
yield

app/core/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from os import environ
2+
3+
from celery import Celery
4+
5+
environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
6+
7+
celery_app = Celery("pyconkr")
8+
celery_app.config_from_object("django.conf:settings", namespace="CELERY")
9+
celery_app.autodiscover_tasks()

app/core/settings.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,15 @@
409409
),
410410
)
411411

412+
# Celery Settings
413+
CELERY_BROKER_URL = env("CELERY_BROKER_URL")
414+
CELERY_RESULT_BACKEND = env("CELERY_RESULT_BACKEND")
415+
CELERY_TASK_ACKS_LATE = True
416+
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
417+
CELERY_TIMEZONE = TIME_ZONE
418+
CELERY_TASK_SOFT_TIME_LIMIT = 60
419+
CELERY_TASK_TIME_LIMIT = 90
420+
412421
# Sentry Settings
413422
if SENTRY_DSN := env("SENTRY_DSN", default=""):
414423
SENTRY_TRACES_SAMPLE_RATE = env.float("SENTRY_TRACES_SAMPLE_RATE", default=1.0)

app/notification/models/base.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,26 +169,20 @@ def sent_to_status_summary(self) -> dict[str, int]:
169169
counts = dict(self.sent_to_list.values("status").annotate(n=models.Count("*")).values_list("status", "n"))
170170
return {status.value.lower(): counts.get(status.value, 0) for status in NotificationStatus}
171171

172-
def _send_each(self, sent_to_qs: "models.QuerySet[NotificationHistorySentToBase]") -> None:
173-
# sent_to.history reverse FK가 매번 재조회되지 않도록 self를 직접 attach (N+1 회피).
174-
for sent_to in sent_to_qs:
175-
sent_to.history = self
176-
try:
177-
sent_to.send()
178-
except Exception:
179-
# 내부 send()가 catch+log하지 못한 경로(상태 저장 실패 등)는 status가 FAILED로 남지 않음 — 추가 로깅.
180-
if sent_to.status != NotificationStatus.FAILED:
181-
slack_logger.exception(
182-
"Batch send unexpected error: history_id=%s recipient=%s",
183-
self.id,
184-
sent_to.recipient,
185-
)
172+
@transaction.atomic
173+
def _dispatch(self, sent_to_qs: "models.QuerySet[NotificationHistorySentToBase]") -> None:
174+
from notification.tasks import send_notification_to_recipient
175+
176+
if not (sent_to_ids := list(sent_to_qs.values_list("id", flat=True))):
177+
return
178+
label = type(self).sent_to_class._meta.label_lower
179+
transaction.on_commit(lambda: [send_notification_to_recipient.delay(label, sid) for sid in sent_to_ids])
186180

187181
def send(self) -> None:
188-
self._send_each(self.sent_to_list.all())
182+
self._dispatch(self.sent_to_list.all())
189183

190184
def retry(self) -> None:
191-
self._send_each(self.sent_to_list.filter(status=NotificationStatus.FAILED))
185+
self._dispatch(self.sent_to_list.filter(status=NotificationStatus.FAILED))
192186

193187

194188
class NotificationHistorySentToBase(BaseAbstractModel):

app/notification/tasks.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from celery import shared_task
2+
from django.apps import apps
3+
from notification.models.base import NotificationStatus, slack_logger
4+
5+
6+
@shared_task(ignore_result=True)
7+
def send_notification_to_recipient(model_label: str, sent_to_id: str) -> None:
8+
sent_to_class = apps.get_model(model_label)
9+
sent_to = sent_to_class.objects.select_related("history").get(pk=sent_to_id)
10+
if sent_to.status not in (NotificationStatus.CREATED, NotificationStatus.FAILED):
11+
return
12+
13+
try:
14+
sent_to.send()
15+
except Exception:
16+
sent_to.refresh_from_db(fields=["status"])
17+
if sent_to.status != NotificationStatus.FAILED:
18+
slack_logger.exception(
19+
"Batch send unexpected error: history_id=%s recipient=%s",
20+
sent_to.history_id,
21+
sent_to.recipient,
22+
)
23+
raise

app/notification/test/history_send_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def test_send_fails_fast_when_context_missing_template_variables(system_user):
121121
assert sent_to.status == NotificationStatus.FAILED
122122

123123

124-
@pytest.mark.django_db
124+
@pytest.mark.django_db(transaction=True)
125125
def test_history_send_iterates_all_sent_to_and_swallows_individual_failures(system_user, email_template):
126126
history = EmailNotificationHistory.objects.create_for_recipients(
127127
template=email_template,
@@ -156,7 +156,7 @@ def test_sent_to_status_summary_counts_by_status(email_template):
156156
assert summary == {"created": 1, "sending": 0, "sent": 1, "failed": 1}
157157

158158

159-
@pytest.mark.django_db
159+
@pytest.mark.django_db(transaction=True)
160160
def test_history_send_logs_unexpected_errors_outside_inner_try(email_template, caplog):
161161
# SentTo.send() 내부 try 밖에서 발생한 예외(예: status save 실패)는 inner catch+log에 안 잡힘 →
162162
# _send_each가 batch를 계속 진행하면서 상위에서 추가 로깅하는지 확인.
@@ -170,7 +170,7 @@ def test_history_send_logs_unexpected_errors_outside_inner_try(email_template, c
170170
assert records[0].exc_info is not None
171171

172172

173-
@pytest.mark.django_db
173+
@pytest.mark.django_db(transaction=True)
174174
def test_history_retry_skips_non_failed_sent_to(email_template):
175175
# FAILED 상태가 아닌 sent_to는 재시도 대상에서 제외 — 외부 호출이 발생하지 않음.
176176
history = _create_history(email_template)
@@ -179,7 +179,7 @@ def test_history_retry_skips_non_failed_sent_to(email_template):
179179
mock_client.send_message.assert_not_called()
180180

181181

182-
@pytest.mark.django_db
182+
@pytest.mark.django_db(transaction=True)
183183
def test_history_retry_resends_failed_sent_to(email_template):
184184
history = _create_history(email_template)
185185
history.sent_to_list.update(status=NotificationStatus.FAILED)
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import logging
2+
from unittest.mock import patch
3+
4+
import pytest
5+
from notification.models import EmailNotificationHistory, EmailNotificationHistorySentTo, EmailNotificationTemplate
6+
from notification.models.base import NotificationStatus
7+
from notification.tasks import send_notification_to_recipient
8+
from user.models import UserExt
9+
10+
LABEL = EmailNotificationHistorySentTo._meta.label_lower
11+
12+
13+
@pytest.fixture
14+
def system_user(db):
15+
return UserExt.get_system_user()
16+
17+
18+
@pytest.fixture
19+
def email_template(system_user):
20+
return EmailNotificationTemplate.objects.create(
21+
code="t",
22+
title="t",
23+
sent_from="from@example.com",
24+
data='{"title":"hi","from_":"f","send_to":"r","body":"b"}',
25+
created_by=system_user,
26+
updated_by=system_user,
27+
)
28+
29+
30+
@pytest.fixture
31+
def sent_to(email_template):
32+
history = EmailNotificationHistory.objects.create_for_recipients(
33+
template=email_template,
34+
recipients=[{"recipient": "to@example.com"}],
35+
)
36+
return history.sent_to_list.get()
37+
38+
39+
@pytest.mark.django_db
40+
def test_task_sends_created_row(sent_to):
41+
with patch.object(EmailNotificationHistory, "client") as mock_client:
42+
send_notification_to_recipient(LABEL, sent_to.id)
43+
mock_client.send_message.assert_called_once()
44+
sent_to.refresh_from_db()
45+
assert sent_to.status == NotificationStatus.SENT
46+
47+
48+
@pytest.mark.django_db
49+
def test_task_resends_failed_row(sent_to):
50+
sent_to.status = NotificationStatus.FAILED
51+
sent_to.save(update_fields=["status"])
52+
with patch.object(EmailNotificationHistory, "client"):
53+
send_notification_to_recipient(LABEL, sent_to.id)
54+
sent_to.refresh_from_db()
55+
assert sent_to.status == NotificationStatus.SENT
56+
57+
58+
@pytest.mark.django_db
59+
def test_task_skips_sending_row_to_prevent_duplicate_send(sent_to):
60+
# 동시성/재배달 가드 — SENDING 중인 row를 다른 워커가 받아도 외부 호출이 일어나지 않아야 함.
61+
sent_to.status = NotificationStatus.SENDING
62+
sent_to.save(update_fields=["status"])
63+
with patch.object(EmailNotificationHistory, "client") as mock_client:
64+
send_notification_to_recipient(LABEL, sent_to.id)
65+
mock_client.send_message.assert_not_called()
66+
sent_to.refresh_from_db()
67+
assert sent_to.status == NotificationStatus.SENDING
68+
69+
70+
@pytest.mark.django_db
71+
def test_task_skips_sent_row_to_prevent_duplicate_send(sent_to):
72+
sent_to.status = NotificationStatus.SENT
73+
sent_to.save(update_fields=["status"])
74+
with patch.object(EmailNotificationHistory, "client") as mock_client:
75+
send_notification_to_recipient(LABEL, sent_to.id)
76+
mock_client.send_message.assert_not_called()
77+
78+
79+
@pytest.mark.django_db
80+
def test_task_marks_failed_and_propagates_external_failure(sent_to):
81+
with patch.object(EmailNotificationHistory, "client") as mock_client:
82+
mock_client.send_message.side_effect = RuntimeError("api down")
83+
with pytest.raises(RuntimeError, match="api down"):
84+
send_notification_to_recipient(LABEL, sent_to.id)
85+
sent_to.refresh_from_db()
86+
assert sent_to.status == NotificationStatus.FAILED
87+
88+
89+
@pytest.mark.django_db
90+
def test_task_logs_unexpected_error_when_inner_save_fails(sent_to, caplog):
91+
# send() 내부 try가 못 잡는 경로(첫 status save 실패) — task가 "Batch send unexpected error"로 추가 로깅.
92+
with patch.object(EmailNotificationHistorySentTo, "save", side_effect=RuntimeError("db down")):
93+
with caplog.at_level(logging.ERROR, logger="slack_logger"):
94+
with pytest.raises(RuntimeError, match="db down"):
95+
send_notification_to_recipient(LABEL, sent_to.id)
96+
records = [r for r in caplog.records if "Batch send unexpected" in r.getMessage()]
97+
assert len(records) == 1
98+
assert records[0].exc_info is not None

envfile/.env.docker

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
DATABASE_HOST=host.docker.internal
2+
CELERY_BROKER_URL=redis://host.docker.internal:46379/0
3+
CELERY_RESULT_BACKEND=redis://host.docker.internal:46379/1

0 commit comments

Comments
 (0)