Skip to content

Commit 27acecb

Browse files
authored
Improve setup and shutdown of periodic_updater (#4737)
Puma shutdown can race with background metrics TimerTasks, which can lead to non-deterministic termination of threads (including database connections). Use 'kill' instead of 'shutdown' to dismiss already enqueued tasks while still allowing in-progress tasks to complete. Then call 'wait_for_termination' with a timeout of 1 second and return 'true' only if all tasks terminated successfully. This enables accurate logging (Successfully stopped periodic updates vs. Failed to stop all periodic update tasks). Instead of calling all 'update_*' methods explicitly in the beginning, configure TimerTasks with 'run_now: true'. Also set 'interval_type: :fixed_rate' so execution timing is deterministic and independent of task duration.
1 parent 7eb181a commit 27acecb

5 files changed

Lines changed: 120 additions & 121 deletions

File tree

lib/cloud_controller/metrics/periodic_updater.rb

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,36 +15,27 @@ def initialize(start_time, log_counter, logger, statsd_updater, prometheus_updat
1515
end
1616

1717
def setup_updates
18-
update!
1918
@update_tasks = []
20-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 600) { catch_error { update_user_count } }
21-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_length } }
22-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_load } }
23-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_failed_job_count } }
24-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_vitals } }
25-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_log_counts } }
26-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_task_stats } }
27-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_deploying_count } }
28-
@update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_webserver_stats } }
19+
setup_task(@update_tasks, 600, :update_user_count)
20+
setup_task(@update_tasks, 30, :update_job_queue_length)
21+
setup_task(@update_tasks, 30, :update_job_queue_load)
22+
setup_task(@update_tasks, 30, :update_failed_job_count)
23+
setup_task(@update_tasks, 30, :update_vitals)
24+
setup_task(@update_tasks, 30, :update_log_counts)
25+
setup_task(@update_tasks, 30, :update_task_stats)
26+
setup_task(@update_tasks, 30, :update_deploying_count)
27+
setup_task(@update_tasks, 30, :update_webserver_stats)
2928
@update_tasks.each(&:execute)
3029
end
3130

3231
def stop_updates
33-
return unless @update_tasks
32+
return true unless @update_tasks
3433

35-
@update_tasks.each(&:shutdown)
36-
end
34+
@update_tasks.each(&:kill) # in-progress tasks will be allowed to complete, enqueued tasks will be dismissed
35+
all_tasks_terminated = true
36+
@update_tasks.each { |task| task.wait_for_termination(1) || (all_tasks_terminated = false) } # wait up to 1 second for each task to terminate
3737

38-
def update!
39-
update_user_count
40-
update_job_queue_length
41-
update_job_queue_load
42-
update_failed_job_count
43-
update_vitals
44-
update_log_counts
45-
update_task_stats
46-
update_deploying_count
47-
update_webserver_stats
38+
all_tasks_terminated # true if all tasks terminated, false if any are still running
4839
end
4940

5041
def catch_error
@@ -172,5 +163,11 @@ def update_webserver_stats
172163
end
173164
@prometheus_updater.update_webserver_stats_puma(worker_count, worker_stats)
174165
end
166+
167+
private
168+
169+
def setup_task(update_tasks, interval, method_name)
170+
update_tasks << Concurrent::TimerTask.new(execution_interval: interval, interval_type: :fixed_rate, run_now: true) { catch_error { send(method_name) } }
171+
end
175172
end
176173
end

lib/cloud_controller/runners/puma_runner.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,20 @@ def start!
8080
private
8181

8282
def stop_periodic_updates
83-
@periodic_updater&.stop_updates
84-
@logger.info('Successfully stopped periodic updates in after_stopped')
83+
return unless @periodic_updater
84+
85+
if @periodic_updater.stop_updates
86+
@logger.info('Successfully stopped periodic updates in after_stopped')
87+
else
88+
@logger.warn('Failed to stop all periodic update tasks in after_stopped')
89+
end
8590
rescue ThreadError
8691
at_exit do
87-
@periodic_updater&.stop_updates
88-
@logger.info('Successfully stopped periodic updates in at_exit')
92+
if @periodic_updater.stop_updates
93+
@logger.info('Successfully stopped periodic updates in at_exit')
94+
else
95+
@logger.warn('Failed to stop all periodic update tasks in at_exit')
96+
end
8997
end
9098
rescue StandardError => e
9199
@logger.error("Failed to stop periodic updates: #{e}\n#{e.backtrace&.join("\n")}")

spec/request/internal/metrics_spec.rb

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
10.times do
4141
VCAP::CloudController::User.make
4242
end
43-
CloudController::DependencyLocator.instance.periodic_updater.update!
43+
CloudController::DependencyLocator.instance.periodic_updater.update_user_count
4444
end
4545

4646
it 'reports the total number of users' do
@@ -54,7 +54,7 @@
5454

5555
context 'cc_vitals' do
5656
it 'reports vitals' do
57-
CloudController::DependencyLocator.instance.periodic_updater.update!
57+
CloudController::DependencyLocator.instance.periodic_updater.update_vitals
5858
get '/internal/v4/metrics', nil
5959

6060
expect(last_response.body).to match(/cc_vitals_num_cores [1-9][0-9]*\.\d+/)
@@ -71,7 +71,7 @@
7171
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.day })
7272
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day })
7373

74-
CloudController::DependencyLocator.instance.periodic_updater.update!
74+
CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_length
7575
end
7676

7777
after do
@@ -91,27 +91,11 @@
9191
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now })
9292
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now })
9393

94-
CloudController::DependencyLocator.instance.periodic_updater.update!
95-
end
96-
97-
after do
98-
Delayed::Job.dataset.delete
99-
end
100-
101-
it 'includes job queue load metric labelled for each queue' do
102-
get '/internal/v4/metrics', nil
103-
104-
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/)
105-
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/)
106-
end
107-
end
108-
109-
context 'cc_job_queue_load_not_ready_to_run_now' do
110-
before do
94+
# jobs with run_at in the future should not be counted towards load
11195
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.minute })
11296
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.minute })
11397

114-
CloudController::DependencyLocator.instance.periodic_updater.update!
98+
CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_load
11599
end
116100

117101
after do
@@ -121,8 +105,8 @@
121105
it 'includes job queue load metric labelled for each queue' do
122106
get '/internal/v4/metrics', nil
123107

124-
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 0\.0/)
125-
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 0\.0/)
108+
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/)
109+
expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/)
126110
end
127111
end
128112

@@ -132,7 +116,7 @@
132116
Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day })
133117
Delayed::Job.dataset.update(failed_at: Time.now.utc)
134118

135-
CloudController::DependencyLocator.instance.periodic_updater.update!
119+
CloudController::DependencyLocator.instance.periodic_updater.update_failed_job_count
136120
end
137121

138122
after do

spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb

Lines changed: 70 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -80,62 +80,23 @@ module VCAP::CloudController::Metrics
8080
allow(prometheus_updater).to receive(:update_job_queue_load)
8181
allow(prometheus_updater).to receive(:update_failed_job_count)
8282
allow(prometheus_updater).to receive(:update_vitals)
83-
allow(prometheus_updater).to receive(:update_log_counts)
8483
allow(prometheus_updater).to receive(:update_task_stats)
8584
allow(prometheus_updater).to receive(:update_deploying_count)
8685
allow(prometheus_updater).to receive(:update_webserver_stats_puma)
8786
end
8887

89-
it 'bumps the number of users and sets periodic timer' do
90-
expect(periodic_updater).to receive(:update_user_count).once
91-
periodic_updater.setup_updates
92-
end
93-
94-
it 'bumps the length of cc job queues and sets periodic timer' do
95-
expect(periodic_updater).to receive(:update_job_queue_length).once
96-
periodic_updater.setup_updates
97-
end
98-
99-
it 'bumps the load of cc job queues and sets periodic timer' do
100-
expect(periodic_updater).to receive(:update_job_queue_load).once
101-
periodic_updater.setup_updates
102-
end
103-
104-
it 'bumps the length of cc failed job queues and sets periodic timer' do
105-
expect(periodic_updater).to receive(:update_failed_job_count).once
106-
periodic_updater.setup_updates
107-
end
108-
109-
it 'updates the vitals' do
110-
expect(periodic_updater).to receive(:update_vitals).once
111-
periodic_updater.setup_updates
112-
end
113-
114-
it 'updates the log counts' do
115-
expect(periodic_updater).to receive(:update_log_counts).once
116-
periodic_updater.setup_updates
117-
end
118-
119-
it 'updates the task stats' do
120-
expect(periodic_updater).to receive(:update_task_stats).once
121-
periodic_updater.setup_updates
122-
end
123-
124-
it 'updates the deploying count' do
125-
expect(periodic_updater).to receive(:update_deploying_count).once
126-
periodic_updater.setup_updates
127-
end
128-
12988
context 'when Concurrent::TimerTasks are run' do
13089
before do
13190
@periodic_timers = []
13291

13392
allow(Concurrent::TimerTask).to receive(:new) do |opts, &block|
13493
@periodic_timers << {
13594
interval: opts[:execution_interval],
95+
type: opts[:interval_type],
96+
now: opts[:run_now],
13697
block: block
13798
}
138-
double('TimerTask', execute: nil, shutdown: nil, kill: nil, running?: false)
99+
double('TimerTask', execute: nil)
139100
end
140101

141102
periodic_updater.setup_updates
@@ -145,6 +106,8 @@ module VCAP::CloudController::Metrics
145106
expect(periodic_updater).to receive(:catch_error).once.and_call_original
146107
expect(periodic_updater).to receive(:update_user_count).once
147108
expect(@periodic_timers[0][:interval]).to eq(600)
109+
expect(@periodic_timers[0][:type]).to eq(:fixed_rate)
110+
expect(@periodic_timers[0][:now]).to be(true)
148111

149112
@periodic_timers[0][:block].call
150113
end
@@ -153,6 +116,8 @@ module VCAP::CloudController::Metrics
153116
expect(periodic_updater).to receive(:catch_error).once.and_call_original
154117
expect(periodic_updater).to receive(:update_job_queue_length).once
155118
expect(@periodic_timers[1][:interval]).to eq(30)
119+
expect(@periodic_timers[1][:type]).to eq(:fixed_rate)
120+
expect(@periodic_timers[1][:now]).to be(true)
156121

157122
@periodic_timers[1][:block].call
158123
end
@@ -161,6 +126,8 @@ module VCAP::CloudController::Metrics
161126
expect(periodic_updater).to receive(:catch_error).once.and_call_original
162127
expect(periodic_updater).to receive(:update_job_queue_load).once
163128
expect(@periodic_timers[2][:interval]).to eq(30)
129+
expect(@periodic_timers[2][:type]).to eq(:fixed_rate)
130+
expect(@periodic_timers[2][:now]).to be(true)
164131

165132
@periodic_timers[2][:block].call
166133
end
@@ -169,6 +136,8 @@ module VCAP::CloudController::Metrics
169136
expect(periodic_updater).to receive(:catch_error).once.and_call_original
170137
expect(periodic_updater).to receive(:update_failed_job_count).once
171138
expect(@periodic_timers[3][:interval]).to eq(30)
139+
expect(@periodic_timers[3][:type]).to eq(:fixed_rate)
140+
expect(@periodic_timers[3][:now]).to be(true)
172141

173142
@periodic_timers[3][:block].call
174143
end
@@ -177,6 +146,8 @@ module VCAP::CloudController::Metrics
177146
expect(periodic_updater).to receive(:catch_error).once.and_call_original
178147
expect(periodic_updater).to receive(:update_vitals).once
179148
expect(@periodic_timers[4][:interval]).to eq(30)
149+
expect(@periodic_timers[4][:type]).to eq(:fixed_rate)
150+
expect(@periodic_timers[4][:now]).to be(true)
180151

181152
@periodic_timers[4][:block].call
182153
end
@@ -185,6 +156,8 @@ module VCAP::CloudController::Metrics
185156
expect(periodic_updater).to receive(:catch_error).once.and_call_original
186157
expect(periodic_updater).to receive(:update_log_counts).once
187158
expect(@periodic_timers[5][:interval]).to eq(30)
159+
expect(@periodic_timers[5][:type]).to eq(:fixed_rate)
160+
expect(@periodic_timers[5][:now]).to be(true)
188161

189162
@periodic_timers[5][:block].call
190163
end
@@ -193,30 +166,75 @@ module VCAP::CloudController::Metrics
193166
expect(periodic_updater).to receive(:catch_error).once.and_call_original
194167
expect(periodic_updater).to receive(:update_task_stats).once
195168
expect(@periodic_timers[6][:interval]).to eq(30)
169+
expect(@periodic_timers[6][:type]).to eq(:fixed_rate)
170+
expect(@periodic_timers[6][:now]).to be(true)
196171

197172
@periodic_timers[6][:block].call
198173
end
174+
175+
it 'updates the deploying count' do
176+
expect(periodic_updater).to receive(:catch_error).once.and_call_original
177+
expect(periodic_updater).to receive(:update_deploying_count).once
178+
expect(@periodic_timers[7][:interval]).to eq(30)
179+
expect(@periodic_timers[7][:type]).to eq(:fixed_rate)
180+
expect(@periodic_timers[7][:now]).to be(true)
181+
182+
@periodic_timers[7][:block].call
183+
end
184+
185+
it 'updates the webserver stats' do
186+
expect(periodic_updater).to receive(:catch_error).once.and_call_original
187+
expect(periodic_updater).to receive(:update_webserver_stats).once
188+
expect(@periodic_timers[8][:interval]).to eq(30)
189+
expect(@periodic_timers[8][:type]).to eq(:fixed_rate)
190+
expect(@periodic_timers[8][:now]).to be(true)
191+
192+
@periodic_timers[8][:block].call
193+
end
199194
end
200195

201196
context 'when PeriodicUpdater is shut down' do
202197
it 'does nothing when updates have not been setup' do
203198
expect { periodic_updater.stop_updates }.not_to raise_error
204199
end
205200

206-
it 'shuts down all timer tasks after setup_updates' do
207-
timer_doubles = []
208-
allow(Concurrent::TimerTask).to receive(:new) do |*|
209-
dbl = double('TimerTask', execute: nil, shutdown: nil)
210-
timer_doubles << dbl
211-
dbl
201+
context 'when Concurrent::TimerTasks are stopped' do
202+
let(:tasks) { [] }
203+
let(:wait_for_termination_response) { nil }
204+
205+
before do
206+
allow(Concurrent::TimerTask).to receive(:new) do |*|
207+
dbl = double('TimerTask', execute: nil, kill: nil, wait_for_termination: wait_for_termination_response)
208+
tasks << dbl
209+
dbl
210+
end
211+
212+
periodic_updater.setup_updates
212213
end
213214

214-
periodic_updater.setup_updates
215+
context 'when tasks are terminated in time' do
216+
let(:wait_for_termination_response) { true }
215217

216-
expect(timer_doubles.size).to eq(9)
217-
expect(timer_doubles).to all(receive(:shutdown).once)
218+
it 'stops all tasks and returns true' do
219+
expect(tasks.size).to eq(9)
220+
expect(tasks).to all(receive(:kill).once)
221+
expect(tasks).to all(receive(:wait_for_termination).with(1).once)
218222

219-
periodic_updater.stop_updates
223+
expect(periodic_updater.stop_updates).to be(true)
224+
end
225+
end
226+
227+
context 'when tasks are not terminated in time' do
228+
let(:wait_for_termination_response) { false }
229+
230+
it 'stops all tasks and returns false' do
231+
expect(tasks.size).to eq(9)
232+
expect(tasks).to all(receive(:kill).once)
233+
expect(tasks).to all(receive(:wait_for_termination).with(1).once)
234+
235+
expect(periodic_updater.stop_updates).to be(false)
236+
end
237+
end
220238
end
221239
end
222240
end
@@ -642,22 +660,6 @@ module VCAP::CloudController::Metrics
642660
end
643661
end
644662

645-
describe '#update!' do
646-
it 'calls all update methods' do
647-
expect(periodic_updater).to receive(:update_user_count).once
648-
expect(periodic_updater).to receive(:update_job_queue_length).once
649-
expect(periodic_updater).to receive(:update_job_queue_load).once
650-
expect(periodic_updater).to receive(:update_failed_job_count).once
651-
expect(periodic_updater).to receive(:update_vitals).once
652-
expect(periodic_updater).to receive(:update_log_counts).once
653-
expect(periodic_updater).to receive(:update_task_stats).once
654-
expect(periodic_updater).to receive(:update_deploying_count).once
655-
expect(periodic_updater).to receive(:update_webserver_stats).once
656-
657-
periodic_updater.update!
658-
end
659-
end
660-
661663
describe '#catch_error' do
662664
it 'calls a block' do
663665
was_called = false

0 commit comments

Comments
 (0)