From 1a3590f07056f668ee469f52180990e4ceb84140 Mon Sep 17 00:00:00 2001 From: Philipp Thun Date: Thu, 8 Jan 2026 12:33:43 +0100 Subject: [PATCH] Improve setup and shutdown of periodic_updater Puma shutdown can race with background metrics TimerTasks, which can lead to non-deterministic termination of threads (including database connections). Use 'kill' instead of 'shutdown' to dismiss already enqueued tasks while still allowing in-progress tasks to complete. Then call 'wait_for_termination' with a timeout of 1 second and return 'true' only if all tasks terminated successfully. This enables accurate logging (Successfully stopped periodic updates vs. Failed to stop all periodic update tasks). Instead of calling all 'update_*' methods explicitly in the beginning, configure TimerTasks with 'run_now: true'. Also set 'interval_type: :fixed_rate' so execution timing is deterministic and independent of task duration. --- .../metrics/periodic_updater.rb | 43 +++--- lib/cloud_controller/runners/puma_runner.rb | 16 +- spec/request/internal/metrics_spec.rb | 32 +--- .../metrics/periodic_updater_spec.rb | 138 +++++++++--------- .../runners/puma_runner_spec.rb | 12 +- 5 files changed, 120 insertions(+), 121 deletions(-) diff --git a/lib/cloud_controller/metrics/periodic_updater.rb b/lib/cloud_controller/metrics/periodic_updater.rb index b40ae97176..f7f793dffc 100644 --- a/lib/cloud_controller/metrics/periodic_updater.rb +++ b/lib/cloud_controller/metrics/periodic_updater.rb @@ -15,36 +15,27 @@ def initialize(start_time, log_counter, logger, statsd_updater, prometheus_updat end def setup_updates - update! @update_tasks = [] - @update_tasks << Concurrent::TimerTask.new(execution_interval: 600) { catch_error { update_user_count } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_length } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_job_queue_load } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_failed_job_count } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_vitals } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_log_counts } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_task_stats } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_deploying_count } } - @update_tasks << Concurrent::TimerTask.new(execution_interval: 30) { catch_error { update_webserver_stats } } + setup_task(@update_tasks, 600, :update_user_count) + setup_task(@update_tasks, 30, :update_job_queue_length) + setup_task(@update_tasks, 30, :update_job_queue_load) + setup_task(@update_tasks, 30, :update_failed_job_count) + setup_task(@update_tasks, 30, :update_vitals) + setup_task(@update_tasks, 30, :update_log_counts) + setup_task(@update_tasks, 30, :update_task_stats) + setup_task(@update_tasks, 30, :update_deploying_count) + setup_task(@update_tasks, 30, :update_webserver_stats) @update_tasks.each(&:execute) end def stop_updates - return unless @update_tasks + return true unless @update_tasks - @update_tasks.each(&:shutdown) - end + @update_tasks.each(&:kill) # in-progress tasks will be allowed to complete, enqueued tasks will be dismissed + all_tasks_terminated = true + @update_tasks.each { |task| task.wait_for_termination(1) || (all_tasks_terminated = false) } # wait up to 1 second for each task to terminate - def update! - update_user_count - update_job_queue_length - update_job_queue_load - update_failed_job_count - update_vitals - update_log_counts - update_task_stats - update_deploying_count - update_webserver_stats + all_tasks_terminated # true if all tasks terminated, false if any are still running end def catch_error @@ -172,5 +163,11 @@ def update_webserver_stats end @prometheus_updater.update_webserver_stats_puma(worker_count, worker_stats) end + + private + + def setup_task(update_tasks, interval, method_name) + update_tasks << Concurrent::TimerTask.new(execution_interval: interval, interval_type: :fixed_rate, run_now: true) { catch_error { send(method_name) } } + end end end diff --git a/lib/cloud_controller/runners/puma_runner.rb b/lib/cloud_controller/runners/puma_runner.rb index bc436062ed..09344f6776 100644 --- a/lib/cloud_controller/runners/puma_runner.rb +++ b/lib/cloud_controller/runners/puma_runner.rb @@ -80,12 +80,20 @@ def start! private def stop_periodic_updates - @periodic_updater&.stop_updates - @logger.info('Successfully stopped periodic updates in after_stopped') + return unless @periodic_updater + + if @periodic_updater.stop_updates + @logger.info('Successfully stopped periodic updates in after_stopped') + else + @logger.warn('Failed to stop all periodic update tasks in after_stopped') + end rescue ThreadError at_exit do - @periodic_updater&.stop_updates - @logger.info('Successfully stopped periodic updates in at_exit') + if @periodic_updater.stop_updates + @logger.info('Successfully stopped periodic updates in at_exit') + else + @logger.warn('Failed to stop all periodic update tasks in at_exit') + end end rescue StandardError => e @logger.error("Failed to stop periodic updates: #{e}\n#{e.backtrace&.join("\n")}") diff --git a/spec/request/internal/metrics_spec.rb b/spec/request/internal/metrics_spec.rb index 221354c3c2..c15659ec0d 100644 --- a/spec/request/internal/metrics_spec.rb +++ b/spec/request/internal/metrics_spec.rb @@ -40,7 +40,7 @@ 10.times do VCAP::CloudController::User.make end - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_user_count end it 'reports the total number of users' do @@ -54,7 +54,7 @@ context 'cc_vitals' do it 'reports vitals' do - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_vitals get '/internal/v4/metrics', nil expect(last_response.body).to match(/cc_vitals_num_cores [1-9][0-9]*\.\d+/) @@ -71,7 +71,7 @@ Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.day }) Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day }) - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_length end after do @@ -91,27 +91,11 @@ Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now }) Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now }) - CloudController::DependencyLocator.instance.periodic_updater.update! - end - - after do - Delayed::Job.dataset.delete - end - - it 'includes job queue load metric labelled for each queue' do - get '/internal/v4/metrics', nil - - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/) - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/) - end - end - - context 'cc_job_queue_load_not_ready_to_run_now' do - before do + # jobs with run_at in the future should not be counted towards load Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_api_0', run_at: Time.now + 1.minute }) Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.minute }) - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_job_queue_load end after do @@ -121,8 +105,8 @@ it 'includes job queue load metric labelled for each queue' do get '/internal/v4/metrics', nil - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 0\.0/) - expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 0\.0/) + expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_api_0"} 1\.0/) + expect(last_response.body).to match(/cc_job_queues_load_total{queue="cc_generic"} 1\.0/) end end @@ -132,7 +116,7 @@ Delayed::Job.enqueue(VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(1), { queue: 'cc_generic', run_at: Time.now + 1.day }) Delayed::Job.dataset.update(failed_at: Time.now.utc) - CloudController::DependencyLocator.instance.periodic_updater.update! + CloudController::DependencyLocator.instance.periodic_updater.update_failed_job_count end after do diff --git a/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb b/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb index da96a56260..41d0b9fca3 100644 --- a/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb +++ b/spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb @@ -80,52 +80,11 @@ module VCAP::CloudController::Metrics allow(prometheus_updater).to receive(:update_job_queue_load) allow(prometheus_updater).to receive(:update_failed_job_count) allow(prometheus_updater).to receive(:update_vitals) - allow(prometheus_updater).to receive(:update_log_counts) allow(prometheus_updater).to receive(:update_task_stats) allow(prometheus_updater).to receive(:update_deploying_count) allow(prometheus_updater).to receive(:update_webserver_stats_puma) end - it 'bumps the number of users and sets periodic timer' do - expect(periodic_updater).to receive(:update_user_count).once - periodic_updater.setup_updates - end - - it 'bumps the length of cc job queues and sets periodic timer' do - expect(periodic_updater).to receive(:update_job_queue_length).once - periodic_updater.setup_updates - end - - it 'bumps the load of cc job queues and sets periodic timer' do - expect(periodic_updater).to receive(:update_job_queue_load).once - periodic_updater.setup_updates - end - - it 'bumps the length of cc failed job queues and sets periodic timer' do - expect(periodic_updater).to receive(:update_failed_job_count).once - periodic_updater.setup_updates - end - - it 'updates the vitals' do - expect(periodic_updater).to receive(:update_vitals).once - periodic_updater.setup_updates - end - - it 'updates the log counts' do - expect(periodic_updater).to receive(:update_log_counts).once - periodic_updater.setup_updates - end - - it 'updates the task stats' do - expect(periodic_updater).to receive(:update_task_stats).once - periodic_updater.setup_updates - end - - it 'updates the deploying count' do - expect(periodic_updater).to receive(:update_deploying_count).once - periodic_updater.setup_updates - end - context 'when Concurrent::TimerTasks are run' do before do @periodic_timers = [] @@ -133,9 +92,11 @@ module VCAP::CloudController::Metrics allow(Concurrent::TimerTask).to receive(:new) do |opts, &block| @periodic_timers << { interval: opts[:execution_interval], + type: opts[:interval_type], + now: opts[:run_now], block: block } - double('TimerTask', execute: nil, shutdown: nil, kill: nil, running?: false) + double('TimerTask', execute: nil) end periodic_updater.setup_updates @@ -145,6 +106,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_user_count).once expect(@periodic_timers[0][:interval]).to eq(600) + expect(@periodic_timers[0][:type]).to eq(:fixed_rate) + expect(@periodic_timers[0][:now]).to be(true) @periodic_timers[0][:block].call end @@ -153,6 +116,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_job_queue_length).once expect(@periodic_timers[1][:interval]).to eq(30) + expect(@periodic_timers[1][:type]).to eq(:fixed_rate) + expect(@periodic_timers[1][:now]).to be(true) @periodic_timers[1][:block].call end @@ -161,6 +126,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_job_queue_load).once expect(@periodic_timers[2][:interval]).to eq(30) + expect(@periodic_timers[2][:type]).to eq(:fixed_rate) + expect(@periodic_timers[2][:now]).to be(true) @periodic_timers[2][:block].call end @@ -169,6 +136,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_failed_job_count).once expect(@periodic_timers[3][:interval]).to eq(30) + expect(@periodic_timers[3][:type]).to eq(:fixed_rate) + expect(@periodic_timers[3][:now]).to be(true) @periodic_timers[3][:block].call end @@ -177,6 +146,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_vitals).once expect(@periodic_timers[4][:interval]).to eq(30) + expect(@periodic_timers[4][:type]).to eq(:fixed_rate) + expect(@periodic_timers[4][:now]).to be(true) @periodic_timers[4][:block].call end @@ -185,6 +156,8 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_log_counts).once expect(@periodic_timers[5][:interval]).to eq(30) + expect(@periodic_timers[5][:type]).to eq(:fixed_rate) + expect(@periodic_timers[5][:now]).to be(true) @periodic_timers[5][:block].call end @@ -193,9 +166,31 @@ module VCAP::CloudController::Metrics expect(periodic_updater).to receive(:catch_error).once.and_call_original expect(periodic_updater).to receive(:update_task_stats).once expect(@periodic_timers[6][:interval]).to eq(30) + expect(@periodic_timers[6][:type]).to eq(:fixed_rate) + expect(@periodic_timers[6][:now]).to be(true) @periodic_timers[6][:block].call end + + it 'updates the deploying count' do + expect(periodic_updater).to receive(:catch_error).once.and_call_original + expect(periodic_updater).to receive(:update_deploying_count).once + expect(@periodic_timers[7][:interval]).to eq(30) + expect(@periodic_timers[7][:type]).to eq(:fixed_rate) + expect(@periodic_timers[7][:now]).to be(true) + + @periodic_timers[7][:block].call + end + + it 'updates the webserver stats' do + expect(periodic_updater).to receive(:catch_error).once.and_call_original + expect(periodic_updater).to receive(:update_webserver_stats).once + expect(@periodic_timers[8][:interval]).to eq(30) + expect(@periodic_timers[8][:type]).to eq(:fixed_rate) + expect(@periodic_timers[8][:now]).to be(true) + + @periodic_timers[8][:block].call + end end context 'when PeriodicUpdater is shut down' do @@ -203,20 +198,43 @@ module VCAP::CloudController::Metrics expect { periodic_updater.stop_updates }.not_to raise_error end - it 'shuts down all timer tasks after setup_updates' do - timer_doubles = [] - allow(Concurrent::TimerTask).to receive(:new) do |*| - dbl = double('TimerTask', execute: nil, shutdown: nil) - timer_doubles << dbl - dbl + context 'when Concurrent::TimerTasks are stopped' do + let(:tasks) { [] } + let(:wait_for_termination_response) { nil } + + before do + allow(Concurrent::TimerTask).to receive(:new) do |*| + dbl = double('TimerTask', execute: nil, kill: nil, wait_for_termination: wait_for_termination_response) + tasks << dbl + dbl + end + + periodic_updater.setup_updates end - periodic_updater.setup_updates + context 'when tasks are terminated in time' do + let(:wait_for_termination_response) { true } - expect(timer_doubles.size).to eq(9) - expect(timer_doubles).to all(receive(:shutdown).once) + it 'stops all tasks and returns true' do + expect(tasks.size).to eq(9) + expect(tasks).to all(receive(:kill).once) + expect(tasks).to all(receive(:wait_for_termination).with(1).once) - periodic_updater.stop_updates + expect(periodic_updater.stop_updates).to be(true) + end + end + + context 'when tasks are not terminated in time' do + let(:wait_for_termination_response) { false } + + it 'stops all tasks and returns false' do + expect(tasks.size).to eq(9) + expect(tasks).to all(receive(:kill).once) + expect(tasks).to all(receive(:wait_for_termination).with(1).once) + + expect(periodic_updater.stop_updates).to be(false) + end + end end end end @@ -642,22 +660,6 @@ module VCAP::CloudController::Metrics end end - describe '#update!' do - it 'calls all update methods' do - expect(periodic_updater).to receive(:update_user_count).once - expect(periodic_updater).to receive(:update_job_queue_length).once - expect(periodic_updater).to receive(:update_job_queue_load).once - expect(periodic_updater).to receive(:update_failed_job_count).once - expect(periodic_updater).to receive(:update_vitals).once - expect(periodic_updater).to receive(:update_log_counts).once - expect(periodic_updater).to receive(:update_task_stats).once - expect(periodic_updater).to receive(:update_deploying_count).once - expect(periodic_updater).to receive(:update_webserver_stats).once - - periodic_updater.update! - end - end - describe '#catch_error' do it 'calls a block' do was_called = false diff --git a/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb b/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb index 40199689cc..31b62c6334 100644 --- a/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb +++ b/spec/unit/lib/cloud_controller/runners/puma_runner_spec.rb @@ -204,8 +204,16 @@ module VCAP::CloudController end describe 'after_stopped' do - it 'stops the TimerTasks and logs incomplete requests' do - expect(periodic_updater).to receive(:stop_updates) + it 'stops the TimerTasks' do + expect(periodic_updater).to receive(:stop_updates).and_return(true) + expect(logger).to receive(:info).with(/Successfully stopped periodic updates/) + + puma_launcher.events.fire(:after_stopped) + end + + it 'logs a warning if stopping the TimerTasks fails' do + expect(periodic_updater).to receive(:stop_updates).and_return(false) + expect(logger).to receive(:warn).with(/Failed to stop all periodic update tasks/) puma_launcher.events.fire(:after_stopped) end