66import random
77import time
88import traceback
9+ from contextlib import contextmanager
910from io import StringIO
1011from typing import Optional
1112
2627DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5
2728
2829
30+ @contextmanager
31+ def _prevent_commit (cr ):
32+ """Context manager to prevent commits on a cursor.
33+
34+ Commiting while the job is not finished would release the job lock, causing
35+ it to be started again by the dead jobs requeuer.
36+ """
37+
38+ def forbidden_commit (* args , ** kwargs ):
39+ raise RuntimeError (
40+ "Commit is forbidden in queue jobs. "
41+ "If the current job is a cron running as queue job, "
42+ "modify it to run as a normal cron."
43+ )
44+
45+ original_commit = cr .commit
46+ cr .commit = forbidden_commit
47+ try :
48+ yield
49+ finally :
50+ cr .commit = original_commit
51+
52+
2953class RunJobController (http .Controller ):
3054 @classmethod
3155 def _acquire_job (cls , env : api .Environment , job_uuid : str ) -> Optional [Job ]:
@@ -69,13 +93,16 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Optional[Job]:
6993 def _try_perform_job (cls , env , job ):
7094 """Try to perform the job, mark it done and commit if successful."""
7195 _logger .debug ("%s started" , job )
72- job .perform ()
73- # Triggers any stored computed fields before calling 'set_done'
74- # so that will be part of the 'exec_time'
75- env .flush_all ()
76- job .set_done ()
77- job .store ()
78- env .flush_all ()
96+ # TODO refactor, the relation between env and job.env is not clear
97+ assert env .cr is job .env .cr
98+ with _prevent_commit (env .cr ):
99+ job .perform ()
100+ # Triggers any stored computed fields before calling 'set_done'
101+ # so that will be part of the 'exec_time'
102+ env .flush_all ()
103+ job .set_done ()
104+ job .store ()
105+ env .flush_all ()
79106 env .cr .commit ()
80107 _logger .debug ("%s done" , job )
81108
@@ -212,6 +239,7 @@ def create_test_job(
212239 size = 1 ,
213240 failure_rate = 0 ,
214241 job_duration = 0 ,
242+ commit_within_job = False ,
215243 ):
216244 """Create test jobs
217245
@@ -267,6 +295,7 @@ def create_test_job(
267295 description = description ,
268296 failure_rate = failure_rate ,
269297 job_duration = job_duration ,
298+ commit_within_job = commit_within_job ,
270299 )
271300
272301 if size > 1 :
@@ -278,6 +307,7 @@ def create_test_job(
278307 description = description ,
279308 failure_rate = failure_rate ,
280309 job_duration = job_duration ,
310+ commit_within_job = commit_within_job ,
281311 )
282312 return ""
283313
@@ -290,6 +320,7 @@ def _create_single_test_job(
290320 size = 1 ,
291321 failure_rate = 0 ,
292322 job_duration = 0 ,
323+ commit_within_job = False ,
293324 ):
294325 delayed = (
295326 http .request .env ["queue.job" ]
@@ -299,7 +330,11 @@ def _create_single_test_job(
299330 channel = channel ,
300331 description = description ,
301332 )
302- ._test_job (failure_rate = failure_rate , job_duration = job_duration )
333+ ._test_job (
334+ failure_rate = failure_rate ,
335+ job_duration = job_duration ,
336+ commit_within_job = commit_within_job ,
337+ )
303338 )
304339 return "job uuid: %s" % (delayed .db_record ().uuid ,)
305340
@@ -314,6 +349,7 @@ def _create_graph_test_jobs(
314349 description = "Test job" ,
315350 failure_rate = 0 ,
316351 job_duration = 0 ,
352+ commit_within_job = False ,
317353 ):
318354 model = http .request .env ["queue.job" ]
319355 current_count = 0
@@ -336,7 +372,11 @@ def _create_graph_test_jobs(
336372 max_retries = max_retries ,
337373 channel = channel ,
338374 description = "%s #%d" % (description , current_count ),
339- )._test_job (failure_rate = failure_rate , job_duration = job_duration )
375+ )._test_job (
376+ failure_rate = failure_rate ,
377+ job_duration = job_duration ,
378+ commit_within_job = commit_within_job ,
379+ )
340380 )
341381
342382 grouping = random .choice (possible_grouping_methods )
0 commit comments