Skip to content

Commit fd97a38

Browse files
authored
Fix spin-loop/cleanup failure mode within run loop (#42)
This ensures that exceptions raised in thread callback hooks are rescued and properly mark jobs as failed. This is also a good opportunity to change the `num` argument (of `work_off(num)`) to mean number of jobs (give or take a few due to `max_claims`), not number of iterations. Previously (before threading was introduced) I think it meant number of jobs (though jobs and iterations were 1:1). I would not have done this before the refactor, because there was no guarantee that one of `success` or `failure` would be incremented (the thread might crash for many reasons). Now, we only increment `success` and treat `total - success` as the "failure" number when we return from the method. Fixes #23 and #41 This is also a prereq for a resolution I'm cooking up for #36
1 parent 24b75f5 commit fd97a38

10 files changed

Lines changed: 33 additions & 35 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
strategy:
99
fail-fast: false
1010
matrix:
11-
ruby: ['2.6', '2.7', '3.0', '3.1', '3.2']
11+
ruby: ['2.7', '3.0', '3.1', '3.2']
1212
gemfile:
1313
- gemfiles/rails_5_2.gemfile
1414
- gemfiles/rails_6_0.gemfile

Gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ gem 'mysql2'
1111
gem 'pg'
1212
gem 'rake'
1313
gem 'rspec'
14-
gem 'sqlite3'
14+
gem 'sqlite3', '~> 1.7.3'
1515
gem 'timecop'
1616
gem 'zeitwerk'

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ DEPENDENCIES
216216
pg
217217
rake
218218
rspec
219-
sqlite3
219+
sqlite3 (~> 1.7.3)
220220
timecop
221221
zeitwerk
222222

gemfiles/rails_5_2.gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ gem "mysql2"
1111
gem "pg"
1212
gem "rake"
1313
gem "rspec"
14-
gem "sqlite3"
14+
gem "sqlite3", "~> 1.7.3"
1515
gem "timecop"
1616
gem "zeitwerk"
1717

gemfiles/rails_6_0.gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ gem "mysql2"
1111
gem "pg"
1212
gem "rake"
1313
gem "rspec"
14-
gem "sqlite3"
14+
gem "sqlite3", "~> 1.7.3"
1515
gem "timecop"
1616
gem "zeitwerk"
1717

gemfiles/rails_6_1.gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ gem "mysql2"
1111
gem "pg"
1212
gem "rake"
1313
gem "rspec"
14-
gem "sqlite3"
14+
gem "sqlite3", "~> 1.7.3"
1515
gem "timecop"
1616
gem "zeitwerk"
1717

gemfiles/rails_7_0.gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ gem "mysql2"
1111
gem "pg"
1212
gem "rake"
1313
gem "rspec"
14-
gem "sqlite3"
14+
gem "sqlite3", "~> 1.7.3"
1515
gem "timecop"
1616
gem "zeitwerk"
1717

gemfiles/rails_7_1.gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ gem "mysql2"
1111
gem "pg"
1212
gem "rake"
1313
gem "rspec"
14-
gem "sqlite3"
14+
gem "sqlite3", "~> 1.7.3"
1515
gem "timecop"
1616
gem "zeitwerk"
1717

gemfiles/rails_main.gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ gem "mysql2"
1111
gem "pg"
1212
gem "rake"
1313
gem "rspec"
14-
gem "sqlite3"
14+
gem "sqlite3", "~> 1.7.3"
1515
gem "timecop"
1616
gem "zeitwerk"
1717
gem "actionview", github: "rails/rails", glob: "actionview/*.gemspec"

lib/delayed/worker.rb

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,17 @@ def on_exit!
8989
# Exit early if interrupted.
9090
def work_off(num = 100)
9191
success = Concurrent::AtomicFixnum.new(0)
92-
failure = Concurrent::AtomicFixnum.new(0)
92+
total = 0
9393

94-
num.times do
94+
while total < num
9595
jobs = reserve_jobs
9696
break if jobs.empty?
9797

98+
total += jobs.length
9899
pool = Concurrent::FixedThreadPool.new(jobs.length)
99100
jobs.each do |job|
100101
pool.post do
101-
run_thread_callbacks(job) do
102-
if run_job(job)
103-
success.increment
104-
else
105-
failure.increment
106-
end
107-
end
102+
success.increment if run_job(job)
108103
end
109104
end
110105

@@ -114,38 +109,41 @@ def work_off(num = 100)
114109
break if stop? # leave if we're exiting
115110
end
116111

117-
[success, failure].map(&:value)
112+
[success.value, total - success.value]
118113
end
119114

120115
def run_thread_callbacks(job, &block)
121116
self.class.lifecycle.run_callbacks(:thread, self, job, &block)
122117
end
123118

124119
def run(job)
125-
metadata = {
126-
status: 'RUNNING',
127-
name: job.name,
128-
run_at: job.run_at,
129-
created_at: job.created_at,
130-
priority: job.priority,
131-
queue: job.queue,
132-
attempts: job.attempts,
133-
enqueued_for: (Time.current - job.created_at).round,
134-
}
135-
job_say job, metadata.to_json
136-
run_time = Benchmark.realtime do
137-
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
138-
job.invoke_job
120+
run_thread_callbacks(job) do
121+
metadata = {
122+
status: 'RUNNING',
123+
name: job.name,
124+
run_at: job.run_at,
125+
created_at: job.created_at,
126+
priority: job.priority,
127+
queue: job.queue,
128+
attempts: job.attempts,
129+
enqueued_for: (Time.current - job.created_at).round,
130+
}
131+
job_say job, metadata.to_json
132+
run_time = Benchmark.realtime do
133+
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
134+
job.invoke_job
135+
end
136+
job.destroy
139137
end
140-
job.destroy
138+
job_say job, format('COMPLETED after %.4f seconds', run_time)
141139
end
142-
job_say job, format('COMPLETED after %.4f seconds', run_time)
143140
true # did work
144141
rescue DeserializationError => e
145142
job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error'
146143

147144
job.error = e
148145
failed(job)
146+
false # work failed
149147
rescue Exception => e # rubocop:disable Lint/RescueException
150148
self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, e) }
151149
false # work failed

0 commit comments

Comments
 (0)