-
-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathprocessing.py
More file actions
80 lines (69 loc) · 2.24 KB
/
processing.py
File metadata and controls
80 lines (69 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from __future__ import annotations
import datetime
from collections.abc import Sequence
from typing import cast
from sqlalchemy import Connection, Row, text
def enqueue(run_id: int, expdb: Connection) -> None:
"""Insert a new pending processing entry for the given run."""
expdb.execute(
text(
"""
INSERT INTO processing_run(`run_id`, `status`, `date`)
VALUES (:run_id, 'pending', :date)
""",
),
parameters={"run_id": run_id, "date": datetime.datetime.now()},
)
def get_pending(expdb: Connection) -> Sequence[Row]:
"""Atomically claim all pending processing_run rows for this worker.
Uses an UPDATE ... WHERE status='pending' approach so that concurrent
workers don't double-process the same run. Claimed rows are set to
'processing' and this worker reads them back by that status.
"""
# Atomically mark pending rows as 'processing' so concurrent workers skip them
expdb.execute(
text(
"""
UPDATE processing_run
SET `status` = 'processing'
WHERE `status` = 'pending'
""",
),
)
return cast(
"Sequence[Row]",
expdb.execute(
text(
"""
SELECT `run_id`, `status`, `date`
FROM processing_run
WHERE `status` = 'processing'
ORDER BY `date` ASC
""",
),
).all(),
)
def mark_done(run_id: int, expdb: Connection) -> None:
"""Mark a processing_run entry as successfully completed."""
expdb.execute(
text(
"""
UPDATE processing_run
SET `status` = 'done'
WHERE `run_id` = :run_id
""",
),
parameters={"run_id": run_id},
)
def mark_error(run_id: int, error_message: str, expdb: Connection) -> None:
"""Mark a processing_run entry as failed and store the error message."""
expdb.execute(
text(
"""
UPDATE processing_run
SET `status` = 'error', `error` = :error_message
WHERE `run_id` = :run_id
""",
),
parameters={"run_id": run_id, "error_message": error_message},
)