Skip to content

Commit f98c4af

Browse files
committed
Fix 'around thread' callback ordering
PR Betterment#42 inadvertently flipped the order of `:thread` and `:perform`, and also pushed `:thread` far enough in that cleanup steps would happen after the `with_connection`'s `end` block in the connection plugin. This introduces the possibility of thread safety issues, or connections being held longer than intended (exhausting the connection pool / connection limits).
1 parent b5abf39 commit f98c4af

2 files changed

Lines changed: 9 additions & 12 deletions

File tree

lib/delayed.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ module Delayed
4242
mattr_accessor(:default_log_level) { 'info'.freeze }
4343
mattr_accessor(:plugins) do
4444
[
45-
Delayed::Plugins::Instrumentation,
4645
Delayed::Plugins::Connection,
46+
Delayed::Plugins::Instrumentation,
4747
]
4848
end
4949

lib/delayed/worker.rb

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,12 @@ def work_off(num = 100)
101101
pool = Concurrent::FixedThreadPool.new(jobs.length)
102102
jobs.each do |job|
103103
pool.post do
104-
success.increment if run_job(job)
104+
self.class.lifecycle.run_callbacks(:thread, self, job) do
105+
success.increment if perform(job)
106+
end
107+
rescue Exception => e # rubocop:disable Lint/RescueException
108+
job_say job, "Job thread crashed with #{e.class.name}: #{e.message}", 'error'
109+
job.error = e
105110
end
106111
end
107112

@@ -117,12 +122,8 @@ def work_off(num = 100)
117122
[success.value, total - success.value]
118123
end
119124

120-
def run_thread_callbacks(job, &block)
121-
self.class.lifecycle.run_callbacks(:thread, self, job, &block)
122-
end
123-
124-
def run(job)
125-
run_thread_callbacks(job) do
125+
def perform(job)
126+
self.class.lifecycle.run_callbacks(:perform, self, job) do
126127
metadata = {
127128
status: 'RUNNING',
128129
name: job.name,
@@ -209,10 +210,6 @@ def handle_failed_job(job, error)
209210
reschedule(job)
210211
end
211212

212-
def run_job(job)
213-
self.class.lifecycle.run_callbacks(:perform, self, job) { run(job) }
214-
end
215-
216213
# The backend adapter may return either a list or a single job
217214
# In some backends, this can be controlled with the `max_claims` config
218215
# Either way, we map this to an array of job instances

0 commit comments

Comments
 (0)