diff --git a/lib/cloud_controller/metrics/periodic_updater.rb b/lib/cloud_controller/metrics/periodic_updater.rb index b40ae97176..f7f793dffc 100644 --- a/lib/cloud_controller/metrics/periodic_updater.rb +++ b/lib/cloud_controller/metrics/periodic_updater.rb @@ -15,36 +15,27 @@ def initialize(start_time, log_counter, logger, statsd_updater, prometheus_updat end def setup_updates - update! @update_tasks = [] - @update_tasks << Concurrent::TimerTask.new(execution_interval: 600) { catch_error { update_user_count } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_length } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_load } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_failed_job_count } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_vitals } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_log_counts } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_task_stats } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_deploying_count } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_webserver_stats } } + setup_task(@update_tasks, 600, :update_user_count) + setup_task(@update_tasks, 30, :update_job_queue_length) + setup_task(@update_tasks, 30, :update_job_queue_load) + setup_task(@update_tasks, 30, :update_failed_job_count) + setup_task(@update_tasks, 30, :update_vitals) + setup_task(@update_tasks, 30, :update_log_counts) + setup_task(@update_tasks, 30, :update_task_stats) + setup_task(@update_tasks, 30, :update_deploying_count) + setup_task(@update_tasks, 30, :update_webserver_stats) @update_tasks.each(&:execute) end def stop_updates - return unless @update_tasks + return true unless @update_tasks - @update_tasks.each(&:shutdown) - end + @update_tasks.each(&:kill) # in-progress tasks will be allowed to complete, enqueued tasks will be dismissed + all_tasks_terminated = true + @update_tasks.each { |task| task.wait_for_termination(1) || (all_tasks_terminated = false) } # wait up to 1 second for each task to terminate - def update! - update_user_count - update_job_queue_length - update_job_queue_load - update_failed_job_count - update_vitals - update_log_counts - update_task_stats - update_deploying_count - update_webserver_stats + all_tasks_terminated # true if all tasks terminated, false if any are still running end def catch_error @@ -172,5 +163,11 @@ def update_webserver_stats end @prometheus_updater.update_webserver_stats_puma(worker_count, worker_stats) end + + private + + def setup_task(update_tasks, interval, method_name) + update_tasks << Concurrent::TimerTask.new(execution_interval: interval, interval_type: :fixed_rate, run_now: true) { catch_error { send(method_name) } } + end end end diff --git a/lib/cloud_controller/runners/puma_runner.rb b/lib/cloud_controller/runners/puma_runner.rb index bc436062ed..09344f6776 100644 --- a/lib/cloud_controller/runners/puma_runner.rb +++ b/lib/cloud_controller/runners/puma_runner.rb @@ -80,12 +80,20 @@ def start! private def stop_periodic_updates - @periodic_updater&.stop_updates - @logger.info('Successfully stopped periodic updates in after_stopped') + return unless @periodic_updater + + if @periodic_updater.stop_updates + @logger.info('Successfully stopped periodic updates in after_stopped') + else + @logger.warn('Failed to stop all periodic update tasks in after_stopped') + end rescue ThreadError at_exit do - @periodic_updater&.stop_updates - @logger.info('Successfully stopped periodic updates in at_exit') + if @periodic_updater.stop_updates + @logger.info('Successfully stopped periodic updates in at_exit') + else + @logger.warn('Failed to stop all periodic update tasks in at_exit') + end end rescue StandardError => e @logger.error("Failed to stop periodic updates: #{e}\n#{e.backtrace&.join("\n")}") diff --git a/spec/request/internal/metrics_spec.rb b/spec/request/internal/metrics_spec.rb index 221354c3c2..c15659ec0d 100644 --- a/spec/request/internal/metrics_spec.rb +++ b/spec/request/internal/metrics_spec.rb @@ -40,7 +40,7 @@ 10.times do VCAP::CloudController::User.make end - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_user_count end it 'reports the total number of users' do @@ -54,7 +54,7 @@ context 'cc_vitals' do it 'reports vitals' do - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_vitals get '/internal/v4/metrics', nil expect(last_response.body).to match(/cc_vitals_num_cores [1-9][0-9]*\.\d+/) @@ -71,7 +71,7 @@ Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.day }) Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day }) - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_length end after do @@ -91,27 +91,11 @@ Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now }) Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now }) - CloudController::DependencyLocator.instance.periodic_updater.update! - end - - after do - Delayed::Job.dataset.delete - end - - it 'includes job queue load metric labelled for each queue' do - get '/internal/v4/metrics', nil - - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/) - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/) - end - end - - context 'cc_job_queue_load_not_ready_to_run_now' do - before do + # jobs with run_at in the future should not be counted towards load Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.minute }) Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.minute }) - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_load end after do @@ -121,8 +105,8 @@ it 'includes job queue load metric labelled for each queue' do get '/internal/v4/metrics', nil - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 0\.0/) - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 0\.0/) + expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/) + expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/) end end @@ -132,7 +116,7 @@ Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day }) Delayed::Job.dataset.update(failed_at: Time.now.utc) - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_failed_job_count end after do diff --git a/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb b/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb index da96a56260..41d0b9fca3 100644 --- a/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb +++ b/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb @@ -80,52 +80,11 @@ module VCAP::CloudController::Metrics allow(prometheus_updater).to receive(:update_job_queue_load) allow(prometheus_updater).to receive(:update_failed_job_count) allow(prometheus_updater).to receive(:update_vitals) - allow(prometheus_updater).to receive(:update_log_counts) allow(prometheus_updater).to receive(:update_task_stats) allow(prometheus_updater).to receive(:update_deploying_count) allow(prometheus_updater).to receive(:update_webserver_stats_puma) end - it 'bumps the number of users and sets periodic timer' do - expect(periodic_updater).to receive(:update_user_count).once - periodic_updater.setup_updates - end - - it 'bumps the length of cc job queues and sets periodic timer' do - expect(periodic_updater).to receive(:update_job_queue_length).once - periodic_updater.setup_updates - end - - it 'bumps the load of cc job queues and sets periodic timer' do - expect(periodic_updater).to receive(:update_job_queue_load).once - periodic_updater.setup_updates - end - - it 'bumps the length of cc failed job queues and sets periodic timer' do - expect(periodic_updater).to receive(:update_failed_job_count).once - periodic_updater.setup_updates - end - - it 'updates the vitals' do - expect(periodic_updater).to receive(:update_vitals).once - periodic_updater.setup_updates - end - - it 'updates the log counts' do - expect(periodic_updater).to receive(:update_log_counts).once - periodic_updater.setup_updates - end - - it 'updates the task stats' do - expect(periodic_updater).to receive(:update_task_stats).once - periodic_updater.setup_updates - end - - it 'updates the deploying count' do - expect(periodic_updater).to receive(:update_deploying_count).once - periodic_updater.setup_updates - end - context 'when Concurrent::TimerTasks are run' do before do @periodic_timers = [] @@ -133,9 +92,11 @@ module VCAP::CloudController::Metrics allow(Concurrent::TimerTask).to receive(:new) do |opts, &block| @periodic_timers << { interval: opts[:execution_interval], + type: opts[:interval_type], + now: opts[:run_now], block: block } - double('TimerTask', execute: nil, shutdown: nil, kill: nil, running?: false) + double('TimerTask', execute: nil) end periodic_updater.setup_updates @@ -145,6 +106,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_user_count).once expect(@periodic_timers[0][:interval]).to eq(600) + expect(@periodic_timers[0][:type]).to eq(:fixed_rate) + expect(@periodic_timers[0][:now]).to be(true) @periodic_timers[0][:block].call end @@ -153,6 +116,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_job_queue_length).once expect(@periodic_timers[1][:interval]).to eq(30) + expect(@periodic_timers[1][:type]).to eq(:fixed_rate) + expect(@periodic_timers[1][:now]).to be(true) @periodic_timers[1][:block].call end @@ -161,6 +126,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_job_queue_load).once expect(@periodic_timers[2][:interval]).to eq(30) + expect(@periodic_timers[2][:type]).to eq(:fixed_rate) + expect(@periodic_timers[2][:now]).to be(true) @periodic_timers[2][:block].call end @@ -169,6 +136,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_failed_job_count).once expect(@periodic_timers[3][:interval]).to eq(30) + expect(@periodic_timers[3][:type]).to eq(:fixed_rate) + expect(@periodic_timers[3][:now]).to be(true) @periodic_timers[3][:block].call end @@ -177,6 +146,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_vitals).once expect(@periodic_timers[4][:interval]).to eq(30) + expect(@periodic_timers[4][:type]).to eq(:fixed_rate) + expect(@periodic_timers[4][:now]).to be(true) @periodic_timers[4][:block].call end @@ -185,6 +156,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_log_counts).once expect(@periodic_timers[5][:interval]).to eq(30) + expect(@periodic_timers[5][:type]).to eq(:fixed_rate) + expect(@periodic_timers[5][:now]).to be(true) @periodic_timers[5][:block].call end @@ -193,9 +166,31 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_task_stats).once expect(@periodic_timers[6][:interval]).to eq(30) + expect(@periodic_timers[6][:type]).to eq(:fixed_rate) + expect(@periodic_timers[6][:now]).to be(true) @periodic_timers[6][:block].call end + + it 'updates the deploying count' do + expect(periodic_updater).to receive(:catch_error).once.and_call_original + expect(periodic_updater).to receive(:update_deploying_count).once + expect(@periodic_timers[7][:interval]).to eq(30) + expect(@periodic_timers[7][:type]).to eq(:fixed_rate) + expect(@periodic_timers[7][:now]).to be(true) + + @periodic_timers[7][:block].call + end + + it 'updates the webserver stats' do + expect(periodic_updater).to receive(:catch_error).once.and_call_original + expect(periodic_updater).to receive(:update_webserver_stats).once + expect(@periodic_timers[8][:interval]).to eq(30) + expect(@periodic_timers[8][:type]).to eq(:fixed_rate) + expect(@periodic_timers[8][:now]).to be(true) + + @periodic_timers[8][:block].call + end end context 'when PeriodicUpdater is shut down' do @@ -203,20 +198,43 @@ module VCAP::CloudController::Metrics expect { periodic_updater.stop_updates }.not_to raise_error end - it 'shuts down all timer tasks after setup_updates' do - timer_doubles = [] - allow(Concurrent::TimerTask).to receive(:new) do |*| - dbl = double('TimerTask', execute: nil, shutdown: nil) - timer_doubles << dbl - dbl + context 'when Concurrent::TimerTasks are stopped' do + let(:tasks) { [] } + let(:wait_for_termination_response) { nil } + + before do + allow(Concurrent::TimerTask).to receive(:new) do |*| + dbl = double('TimerTask', execute: nil, kill: nil, wait_for_termination: wait_for_termination_response) + tasks << dbl + dbl + end + + periodic_updater.setup_updates end - periodic_updater.setup_updates + context 'when tasks are terminated in time' do + let(:wait_for_termination_response) { true } - expect(timer_doubles.size).to eq(9) - expect(timer_doubles).to all(receive(:shutdown).once) + it 'stops all tasks and returns true' do + expect(tasks.size).to eq(9) + expect(tasks).to all(receive(:kill).once) + expect(tasks).to all(receive(:wait_for_termination).with(1).once) - periodic_updater.stop_updates + expect(periodic_updater.stop_updates).to be(true) + end + end + + context 'when tasks are not terminated in time' do + let(:wait_for_termination_response) { false } + + it 'stops all tasks and returns false' do + expect(tasks.size).to eq(9) + expect(tasks).to all(receive(:kill).once) + expect(tasks).to all(receive(:wait_for_termination).with(1).once) + + expect(periodic_updater.stop_updates).to be(false) + end + end end end end @@ -642,22 +660,6 @@ module VCAP::CloudController::Metrics end end - describe '#update!' do - it 'calls all update methods' do - expect(periodic_updater).to receive(:update_user_count).once - expect(periodic_updater).to receive(:update_job_queue_length).once - expect(periodic_updater).to receive(:update_job_queue_load).once - expect(periodic_updater).to receive(:update_failed_job_count).once - expect(periodic_updater).to receive(:update_vitals).once - expect(periodic_updater).to receive(:update_log_counts).once - expect(periodic_updater).to receive(:update_task_stats).once - expect(periodic_updater).to receive(:update_deploying_count).once - expect(periodic_updater).to receive(:update_webserver_stats).once - - periodic_updater.update! - end - end - describe '#catch_error' do it 'calls a block' do was_called = false diff --git a/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb b/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb index 40199689cc..31b62c6334 100644 --- a/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb +++ b/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb @@ -204,8 +204,16 @@ module VCAP::CloudController end describe 'after_stopped' do - it 'stops the TimerTasks and logs incomplete requests' do - expect(periodic_updater).to receive(:stop_updates) + it 'stops the TimerTasks' do + expect(periodic_updater).to receive(:stop_updates).and_return(true) + expect(logger).to receive(:info).with(/Successfully stopped periodic updates/) + + puma_launcher.events.fire(:after_stopped) + end + + it 'logs a warning if stopping the TimerTasks fails' do + expect(periodic_updater).to receive(:stop_updates).and_return(false) + expect(logger).to receive(:warn).with(/Failed to stop all periodic update tasks/) puma_launcher.events.fire(:after_stopped) end