2727
2828
2929class RunJobController (http .Controller ):
30- def _try_perform_job (self , env , job ):
31- """Try to perform the job."""
30+ @classmethod
31+ def _acquire_job (cls , env : api .Environment , job_uuid : str ) -> Job | None :
32+ """Acquire a job for execution.
33+
34+ - make sure it is in ENQUEUED state
35+ - mark it as STARTED and commit the state change
36+ - acquire the job lock
37+
38+ If successful, return the Job instance, otherwise return None. This
39+ function may fail to acquire the job is not in the expected state or is
40+ already locked by another worker.
41+ """
42+ env .cr .execute (
43+ "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s "
44+ "FOR UPDATE SKIP LOCKED" ,
45+ (job_uuid , ENQUEUED ),
46+ )
47+ if not env .cr .fetchone ():
48+ _logger .warning (
49+ "was requested to run job %s, but it does not exist, "
50+ "or is not in state %s, or is being handled by another worker" ,
51+ job_uuid ,
52+ ENQUEUED ,
53+ )
54+ return None
55+ job = Job .load (env , job_uuid )
56+ assert job and job .state == ENQUEUED
3257 job .set_started ()
3358 job .store ()
3459 env .cr .commit ()
35- job .lock ()
60+ if not job .lock ():
61+ _logger .warning (
62+ "was requested to run job %s, but it could not be locked" ,
63+ job_uuid ,
64+ )
65+ return None
66+ return job
3667
68+ @classmethod
69+ def _try_perform_job (cls , env , job ):
70+ """Try to perform the job, mark it done and commit if successful."""
3771 _logger .debug ("%s started" , job )
38-
3972 job .perform ()
4073 # Triggers any stored computed fields before calling 'set_done'
4174 # so that will be part of the 'exec_time'
@@ -46,7 +79,8 @@ def _try_perform_job(self, env, job):
4679 env .cr .commit ()
4780 _logger .debug ("%s done" , job )
4881
49- def _enqueue_dependent_jobs (self , env , job ):
82+ @classmethod
83+ def _enqueue_dependent_jobs (cls , env , job ):
5084 tries = 0
5185 while True :
5286 try :
@@ -76,17 +110,8 @@ def _enqueue_dependent_jobs(self, env, job):
76110 else :
77111 break
78112
79- @http .route (
80- "/queue_job/runjob" ,
81- type = "http" ,
82- auth = "none" ,
83- save_session = False ,
84- readonly = False ,
85- )
86- def runjob (self , db , job_uuid , ** kw ):
87- http .request .session .db = db
88- env = http .request .env (user = SUPERUSER_ID )
89-
113+ @classmethod
114+ def _runjob (cls , env : api .Environment , job : Job ) -> None :
90115 def retry_postpone (job , message , seconds = None ):
91116 job .env .clear ()
92117 with Registry (job .env .cr .dbname ).cursor () as new_cr :
@@ -95,26 +120,9 @@ def retry_postpone(job, message, seconds=None):
95120 job .set_pending (reset_retry = False )
96121 job .store ()
97122
98- # ensure the job to run is in the correct state and lock the record
99- env .cr .execute (
100- "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE" ,
101- (job_uuid , ENQUEUED ),
102- )
103- if not env .cr .fetchone ():
104- _logger .warning (
105- "was requested to run job %s, but it does not exist, "
106- "or is not in state %s" ,
107- job_uuid ,
108- ENQUEUED ,
109- )
110- return ""
111-
112- job = Job .load (env , job_uuid )
113- assert job and job .state == ENQUEUED
114-
115123 try :
116124 try :
117- self ._try_perform_job (env , job )
125+ cls ._try_perform_job (env , job )
118126 except OperationalError as err :
119127 # Automatically retry the typical transaction serialization
120128 # errors
@@ -132,7 +140,6 @@ def retry_postpone(job, message, seconds=None):
132140 # traceback in the logs we should have the traceback when all
133141 # retries are exhausted
134142 env .cr .rollback ()
135- return ""
136143
137144 except (FailedJobError , Exception ) as orig_exception :
138145 buff = StringIO ()
@@ -142,19 +149,18 @@ def retry_postpone(job, message, seconds=None):
142149 job .env .clear ()
143150 with Registry (job .env .cr .dbname ).cursor () as new_cr :
144151 job .env = job .env (cr = new_cr )
145- vals = self ._get_failure_values (job , traceback_txt , orig_exception )
152+ vals = cls ._get_failure_values (job , traceback_txt , orig_exception )
146153 job .set_failed (** vals )
147154 job .store ()
148155 buff .close ()
149156 raise
150157
151158 _logger .debug ("%s enqueue depends started" , job )
152- self ._enqueue_dependent_jobs (env , job )
159+ cls ._enqueue_dependent_jobs (env , job )
153160 _logger .debug ("%s enqueue depends done" , job )
154161
155- return ""
156-
157- def _get_failure_values (self , job , traceback_txt , orig_exception ):
162+ @classmethod
163+ def _get_failure_values (cls , job , traceback_txt , orig_exception ):
158164 """Collect relevant data from exception."""
159165 exception_name = orig_exception .__class__ .__name__
160166 if hasattr (orig_exception , "__module__" ):
@@ -168,6 +174,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):
168174 "exc_message" : exc_message ,
169175 }
170176
177+ @http .route (
178+ "/queue_job/runjob" ,
179+ type = "http" ,
180+ auth = "none" ,
181+ save_session = False ,
182+ readonly = False ,
183+ )
184+ def runjob (self , db , job_uuid , ** kw ):
185+ http .request .session .db = db
186+ env = http .request .env (user = SUPERUSER_ID )
187+ job = self ._acquire_job (env , job_uuid )
188+ if not job :
189+ return ""
190+ self ._runjob (env , job )
191+ return ""
192+
171193 # flake8: noqa: C901
172194 @http .route ("/queue_job/create_test_job" , type = "http" , auth = "user" )
173195 def create_test_job (
0 commit comments