Skip to content

Commit acbc670

Browse files
author
Conor Pappas
committed
Refactors handling redis disconnects and uses it for the watcher thread as well
1 parent 668e522 commit acbc670

1 file changed

Lines changed: 25 additions & 16 deletions

File tree

lib/resque_stuck_queue.rb

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -171,22 +171,25 @@ def setup_watcher_thread
171171
Thread.current.abort_on_exception = abort_on_exception
172172
log_starting_thread(:watcher)
173173
while @running
174-
mutex = RedisMutex.new(:resque_stuck_queue, :block => 0)
175-
if mutex.lock
176-
begin
177-
queues.each do |queue_name|
178-
log_watcher_info(queue_name)
179-
if should_trigger?(queue_name)
180-
trigger_handler(queue_name, :triggered)
181-
elsif should_recover?(queue_name)
182-
trigger_handler(queue_name, :recovered)
174+
error_lambda = lambda { |e| "Watcher thread couldn't access redis: #{e.inspect}" }
175+
handle_redis_disconnect(error_lambda) do
176+
mutex = RedisMutex.new(:resque_stuck_queue, :block => 0)
177+
if mutex.lock
178+
begin
179+
queues.each do |queue_name|
180+
log_watcher_info(queue_name)
181+
if should_trigger?(queue_name)
182+
trigger_handler(queue_name, :triggered)
183+
elsif should_recover?(queue_name)
184+
trigger_handler(queue_name, :recovered)
185+
end
183186
end
187+
ensure
188+
mutex.unlock
184189
end
185-
ensure
186-
mutex.unlock
187190
end
191+
wait_for_it(:watcher_interval)
188192
end
189-
wait_for_it(:watcher_interval)
190193
end
191194
end
192195
end
@@ -227,13 +230,11 @@ def enqueue_jobs
227230
config[:heartbeat_job].call
228231
else
229232
queues.each do |queue_name|
230-
begin
233+
error_lambda = lambda { |e| "Enqueuing heartbeat job for #{queue_name} crashed: #{e.inspect}" }
234+
handle_redis_disconnect(error_lambda) do
231235
# Redis::Namespace.new support as well as Redis.new
232236
namespace = redis.respond_to?(:namespace) ? redis.namespace : nil
233237
Resque.enqueue_to(queue_name, HeartbeatJob, heartbeat_key_for(queue_name), redis.client.host, redis.client.port, namespace, Time.now.to_i )
234-
rescue Redis::CannotConnectError => e
235-
logger.error("Enqueuing heartbeat job for #{queue_name} crashed: #{e.inspect}")
236-
logger.error("\n#{e.backtrace.join("\n")}")
237238
end
238239
end
239240
end
@@ -319,6 +320,14 @@ def pretty_process_name
319320
$0 = "rake --trace resque:stuck_queue #{redis.inspect} QUEUES=#{queues.join(",")}"
320321
end
321322

323+
def handle_redis_disconnect(error_message)
324+
yield
325+
rescue Redis::BaseError, SocketError => e
326+
message = error_message.respond_to?(:call) ? error_message.call : error_message
327+
logger.error(message)
328+
logger.error("\n#{e.backtrace.join("\n")}")
329+
raise e unless config[:redis_disconnect_recovery]
330+
end
322331
end
323332
end
324333
end

0 commit comments

Comments
 (0)