Skip to content

Commit eacc671

Browse files
committed
[IMP] queue_job: prevent commit during queue job execution
This would release the job lock, causing spurious restarts by the dead jobs requeuer.
1 parent d2a8e90 commit eacc671

2 files changed

Lines changed: 52 additions & 10 deletions

File tree

queue_job/controllers/main.py

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import random
77
import time
88
import traceback
9+
from contextlib import contextmanager
910
from io import StringIO
1011

1112
from psycopg2 import OperationalError, errorcodes
@@ -26,6 +27,29 @@
2627
DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5
2728

2829

30+
@contextmanager
31+
def _prevent_commit(cr):
32+
"""Context manager to prevent commits on a cursor.
33+
34+
Commiting while the job is not finished would release the job lock, causing
35+
it to be started again by the dead jobs requeuer.
36+
"""
37+
38+
def forbidden_commit(*args, **kwargs):
39+
raise RuntimeError(
40+
"Commit is forbidden in queue jobs. "
41+
"If the current job is a cron running as queue job, "
42+
"modify it to run as a normal cron."
43+
)
44+
45+
original_commit = cr.commit
46+
cr.commit = forbidden_commit
47+
try:
48+
yield
49+
finally:
50+
cr.commit = original_commit
51+
52+
2953
class RunJobController(http.Controller):
3054
@classmethod
3155
def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
@@ -69,13 +93,16 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
6993
def _try_perform_job(cls, env, job):
7094
"""Try to perform the job, mark it done and commit if successful."""
7195
_logger.debug("%s started", job)
72-
job.perform()
73-
# Triggers any stored computed fields before calling 'set_done'
74-
# so that will be part of the 'exec_time'
75-
env.flush_all()
76-
job.set_done()
77-
job.store()
78-
env.flush_all()
96+
# TODO refactor, the relation between env and job.env is not clear
97+
assert env.cr is job.env.cr
98+
with _prevent_commit(env.cr):
99+
job.perform()
100+
# Triggers any stored computed fields before calling 'set_done'
101+
# so that will be part of the 'exec_time'
102+
env.flush_all()
103+
job.set_done()
104+
job.store()
105+
env.flush_all()
79106
env.cr.commit()
80107
_logger.debug("%s done", job)
81108

@@ -201,6 +228,7 @@ def create_test_job(
201228
size=1,
202229
failure_rate=0,
203230
job_duration=0,
231+
commit_within_job=False,
204232
):
205233
if not http.request.env.user.has_group("base.group_erp_manager"):
206234
raise Forbidden(http.request.env._("Access Denied"))
@@ -246,6 +274,7 @@ def create_test_job(
246274
description=description,
247275
failure_rate=failure_rate,
248276
job_duration=job_duration,
277+
commit_within_job=commit_within_job,
249278
)
250279

251280
if size > 1:
@@ -257,6 +286,7 @@ def create_test_job(
257286
description=description,
258287
failure_rate=failure_rate,
259288
job_duration=job_duration,
289+
commit_within_job=commit_within_job,
260290
)
261291
return ""
262292

@@ -269,6 +299,7 @@ def _create_single_test_job(
269299
size=1,
270300
failure_rate=0,
271301
job_duration=0,
302+
commit_within_job=False,
272303
):
273304
delayed = (
274305
http.request.env["queue.job"]
@@ -278,7 +309,11 @@ def _create_single_test_job(
278309
channel=channel,
279310
description=description,
280311
)
281-
._test_job(failure_rate=failure_rate, job_duration=job_duration)
312+
._test_job(
313+
failure_rate=failure_rate,
314+
job_duration=job_duration,
315+
commit_within_job=commit_within_job,
316+
)
282317
)
283318
return f"job uuid: {delayed.db_record().uuid}"
284319

@@ -293,6 +328,7 @@ def _create_graph_test_jobs(
293328
description="Test job",
294329
failure_rate=0,
295330
job_duration=0,
331+
commit_within_job=False,
296332
):
297333
model = http.request.env["queue.job"]
298334
current_count = 0
@@ -315,7 +351,11 @@ def _create_graph_test_jobs(
315351
max_retries=max_retries,
316352
channel=channel,
317353
description=f"{description} #{current_count}",
318-
)._test_job(failure_rate=failure_rate, job_duration=job_duration)
354+
)._test_job(
355+
failure_rate=failure_rate,
356+
job_duration=job_duration,
357+
commit_within_job=commit_within_job,
358+
)
319359
)
320360

321361
grouping = random.choice(possible_grouping_methods)

queue_job/models/queue_job.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,11 @@ def related_action_open_record(self):
458458
)
459459
return action
460460

461-
def _test_job(self, failure_rate=0, job_duration=0):
461+
def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False):
462462
_logger.info("Running test job.")
463463
if random.random() <= failure_rate:
464464
raise JobError("Job failed")
465465
if job_duration:
466466
time.sleep(job_duration)
467+
if commit_within_job:
468+
self.env.cr.commit()

0 commit comments

Comments
 (0)