-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathsidekiq-queue-pause.rb
More file actions
89 lines (72 loc) · 2.13 KB
/
sidekiq-queue-pause.rb
File metadata and controls
89 lines (72 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
require "sidekiq"
require "sidekiq/fetch"
module Sidekiq
module QueuePause
PREFIX = "queue_pause"
class << self
attr_accessor :retry_after
attr_writer :process_key
def process_key(&block)
if block
@process_key = block
else
@process_key.is_a?(Proc) ? @process_key.call : @process_key
end
end
def pause(queue, pkey = nil)
Sidekiq.redis { |it| it.set rkey(queue, pkey), true }
end
def unpause(queue, pkey = nil)
Sidekiq.redis { |it| it.del rkey(queue, pkey) }
end
def paused?(queue, pkey = nil)
Sidekiq.redis { |it| it.exists? rkey(queue, pkey) }
end
def unpause_all
Sidekiq.redis { |it| it.keys("#{PREFIX}:*").each { |k| it.del k } }
end
private
def rkey(queue, pkey)
pkey ? "#{PREFIX}:#{queue}:#{pkey}" : "#{PREFIX}:#{queue}"
end
end
class PausingFetch < Sidekiq::BasicFetch
def retrieve_work
qcmd = unpaused_queues_cmd
if qcmd.size > 1
retrieve_work_for_queues qcmd
else
sleep(Sidekiq::QueuePause.retry_after || Sidekiq::BasicFetch::TIMEOUT)
nil
end
end
def retrieve_work_for_queues(qcmd)
#queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) }
queue, job = redis { |conn| conn.brpop(*qcmd) }
UnitOfWork.new(queue, job, config) if queue
end
# Returns the list of unpause queue names.
#
# @return [Array<String>] The list of unpaused queue names.
def unpaused_queues_cmd
queues = queues_cmd
queues.reject do |q|
next if q.is_a?(Integer)
next if q.is_a?(Hash)
Sidekiq::QueuePause.paused?(q.gsub("queue:", ""), Sidekiq::QueuePause.process_key)
end
end
end
end
class Queue
def pause(pkey = nil)
Sidekiq::QueuePause.pause(name, pkey)
end
def unpause(pkey = nil)
Sidekiq::QueuePause.unpause(name, pkey)
end
def paused?(pkey = nil)
Sidekiq::QueuePause.paused?(name, pkey)
end
end
end