From a45adfe807637a484dcbcf49a79bd3b4d112d42a Mon Sep 17 00:00:00 2001 From: Philipp Thun Date: Thu, 8 Jan 2026 18:33:42 +0100 Subject: [PATCH 1/2] Improve Diego task sync - Only select required fields instead of all when batching CC tasks. - Process missing diego tasks in workpool (i.e. call to BBS, DB select + update). - Add helper methods to submit to workpool. - Adapt DatabaseIsolation: reset 'api' config is only needed for 'truncation' isolation (in reset_tables). --- lib/cloud_controller/diego/tasks_sync.rb | 63 +++++++++++-------- spec/db_spec_helper.rb | 6 +- .../packages_controller_index_spec.rb | 2 +- spec/spec_helper.rb | 20 +++--- spec/support/database_isolation.rb | 12 ++-- .../cloud_controller/diego/tasks_sync_spec.rb | 8 ++- 6 files changed, 59 insertions(+), 52 deletions(-) diff --git a/lib/cloud_controller/diego/tasks_sync.rb b/lib/cloud_controller/diego/tasks_sync.rb index f9528b239e6..437fd2fa5a1 100644 --- a/lib/cloud_controller/diego/tasks_sync.rb +++ b/lib/cloud_controller/diego/tasks_sync.rb @@ -19,37 +19,25 @@ def sync diego_tasks = bbs_task_client.fetch_tasks.index_by(&:task_guid) - batched_cc_tasks do |cc_tasks| - tasks_to_fail = [] + to_update = [] + to_cancel = [] + batched_cc_tasks do |cc_tasks| cc_tasks.each do |cc_task| diego_task = diego_tasks.delete(cc_task.guid) next unless [TaskModel::RUNNING_STATE, TaskModel::CANCELING_STATE].include? cc_task.state if diego_task.nil? - tasks_to_fail << cc_task.guid if diego_task_missing?(cc_task.guid) && !task_finished_while_iterating?(cc_task.guid) - logger.info('missing-diego-task', task_guid: cc_task.guid) + to_update << cc_task.guid elsif cc_task.state == TaskModel::CANCELING_STATE - workpool.submit(cc_task.guid) do |guid| - bbs_task_client.cancel_task(guid) - logger.info('canceled-cc-task', task_guid: guid) - end - end - end - - unless tasks_to_fail.empty? - TaskModel.where(guid: tasks_to_fail).each do |cc_task| - cc_task.update(state: TaskModel::FAILED_STATE, failure_reason: BULKER_TASK_FAILURE) + to_cancel << cc_task.guid end end end - diego_tasks.each_key do |task_guid| - workpool.submit(task_guid) do |guid| - bbs_task_client.cancel_task(guid) - logger.info('missing-cc-task', task_guid: guid) - end - end + update_missing_diego_tasks(to_update) + cancel_cc_tasks(to_cancel) + cancel_missing_cc_tasks(diego_tasks) workpool.drain @@ -87,13 +75,36 @@ def formatted_backtrace_from_error(error) error.backtrace.present? ? error.backtrace.join("\n") + "\n..." : '' end - def diego_task_missing?(task_guid) - bbs_task_client.fetch_task(task_guid).nil? + def update_missing_diego_tasks(to_update) + to_update.each do |task_guid| + workpool.submit(task_guid) do |guid| + diego_task_missing = bbs_task_client.fetch_task(guid).nil? + if diego_task_missing + # Mark the CC task as failed. Don't update tasks that are already in a terminal state. + task = TaskModel.where(guid:).exclude(state: [TaskModel::FAILED_STATE, TaskModel::SUCCEEDED_STATE]).first + task&.update(state: TaskModel::FAILED_STATE, failure_reason: BULKER_TASK_FAILURE) # invoke model's update method to create an event + logger.info('missing-diego-task', task_guid: guid) + end + end + end + end + + def cancel_cc_tasks(to_cancel) + to_cancel.each do |task_guid| + workpool.submit(task_guid) do |guid| + bbs_task_client.cancel_task(guid) + logger.info('canceled-cc-task', task_guid: guid) + end + end end - def task_finished_while_iterating?(task_guid) - cc_task = TaskModel.find(guid: task_guid) - [TaskModel::FAILED_STATE, TaskModel::SUCCEEDED_STATE].include?(cc_task.state) + def cancel_missing_cc_tasks(to_cancel_missing) + to_cancel_missing.each_key do |task_guid| + workpool.submit(task_guid) do |guid| + bbs_task_client.cancel_task(guid) + logger.info('missing-cc-task', task_guid: guid) + end + end end def batched_cc_tasks @@ -101,7 +112,7 @@ def batched_cc_tasks loop do tasks = TaskModel.where( Sequel.lit('tasks.id > ?', last_id) - ).order(:id).limit(BATCH_SIZE).all + ).order(:id).limit(BATCH_SIZE).select(:id, :guid, :state).all yield tasks return if tasks.count < BATCH_SIZE diff --git a/spec/db_spec_helper.rb b/spec/db_spec_helper.rb index 39d535e1990..426fa2a8c89 100644 --- a/spec/db_spec_helper.rb +++ b/spec/db_spec_helper.rb @@ -26,11 +26,7 @@ end rspec_config.around do |example| - # DatabaseIsolation requires the api config context - TestConfig.context = :api - TestConfig.reset - - isolation = DatabaseIsolation.choose(example.metadata[:isolation], TestConfig.config_instance, DbConfig.new.connection) + isolation = DatabaseIsolation.choose(example.metadata[:isolation], DbConfig.new.connection) isolation.cleanly { example.run } end end diff --git a/spec/performance/packages_controller_index_spec.rb b/spec/performance/packages_controller_index_spec.rb index 3bf5d53e376..e23d86f3b0c 100644 --- a/spec/performance/packages_controller_index_spec.rb +++ b/spec/performance/packages_controller_index_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' require 'rails_helper' -RSpec.describe PackagesController, type: :controller do # , isolation: :truncation +RSpec.describe PackagesController, type: :controller do describe '#index' do let(:user) { set_current_user(VCAP::CloudController::User.make) } let(:app_model) { VCAP::CloudController::AppModel.make } diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 81611b76951..da1a3b2a1e1 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -166,6 +166,13 @@ end rspec_config.before do + Delayed::Worker.destroy_failed_jobs = false + Sequel::Deprecation.output = StringIO.new + Sequel::Deprecation.backtrace_filter = 5 + + TestConfig.context = example.metadata[:job_context] || :api + TestConfig.reset + Fog::Mock.reset if Fog.mock? @@ -175,13 +182,6 @@ CloudController::DependencyLocator.instance.buildpack_blobstore.ensure_bucket_exists end - Delayed::Worker.destroy_failed_jobs = false - Sequel::Deprecation.output = StringIO.new - Sequel::Deprecation.backtrace_filter = 5 - - TestConfig.context = example.metadata[:job_context] || :api - TestConfig.reset - VCAP::CloudController::SecurityContext.clear allow_any_instance_of(VCAP::CloudController::UaaTokenDecoder).to receive(:uaa_issuer).and_return(UAAIssuer::ISSUER) @@ -190,11 +190,7 @@ end rspec_config.around do |example| - # DatabaseIsolation requires the api config context - TestConfig.context = :api - TestConfig.reset - - isolation = DatabaseIsolation.choose(example.metadata[:isolation], TestConfig.config_instance, DbConfig.new.connection) + isolation = DatabaseIsolation.choose(example.metadata[:isolation], DbConfig.new.connection) isolation.cleanly { example.run } end diff --git a/spec/support/database_isolation.rb b/spec/support/database_isolation.rb index 891944063da..e8bbab49e62 100644 --- a/spec/support/database_isolation.rb +++ b/spec/support/database_isolation.rb @@ -1,16 +1,15 @@ module DatabaseIsolation - def self.choose(isolation, config, db) + def self.choose(isolation, db) case isolation when :truncation - TruncateTables.new(config, db) + TruncateTables.new(db) else RollbackTransaction.new end end class TruncateTables - def initialize(config, db) - @config = config + def initialize(db) @db = db end @@ -24,7 +23,10 @@ def reset_tables table_truncator = TableTruncator.new(db) table_truncator.truncate_tables - VCAP::CloudController::Seeds.write_seed_data(config) + # VCAP::CloudController::Seeds requires the :api config + TestConfig.context = :api + TestConfig.reset + VCAP::CloudController::Seeds.write_seed_data(TestConfig.config_instance) end private diff --git a/spec/unit/lib/cloud_controller/diego/tasks_sync_spec.rb b/spec/unit/lib/cloud_controller/diego/tasks_sync_spec.rb index 7029d1926b6..64c18684496 100644 --- a/spec/unit/lib/cloud_controller/diego/tasks_sync_spec.rb +++ b/spec/unit/lib/cloud_controller/diego/tasks_sync_spec.rb @@ -45,7 +45,8 @@ module Diego end end - context 'when a running CC task is missing from BBS' do + context 'when a running CC task is missing from BBS', isolation: :truncation do + # Can't use transactions for isolation because we're using multiple threads let!(:running_task) { TaskModel.make(:running, created_at: 1.minute.ago) } let!(:canceling_task) { TaskModel.make(:canceling, created_at: 1.minute.ago) } let!(:start_event_for_running_task) { AppUsageEvent.make(task_guid: running_task.guid, state: 'TASK_STARTED') } @@ -270,7 +271,7 @@ module Diego end end - context 'when a new task is created after cc initally fetches tasks from bbs' do + context 'when a new task is created after cc initially fetches tasks from bbs' do context 'and the newly started task does not complete before checking to see if it should fail' do let!(:cc_task) { TaskModel.make(guid: 'some-task-guid', state: TaskModel::RUNNING_STATE) } let(:bbs_task) { ::Diego::Bbs::Models::Task.new(task_guid: 'some-task-guid', state: ::Diego::Bbs::Models::Task::State::Running) } @@ -298,7 +299,8 @@ module Diego end end - context 'and the newly started task completes before the iteration completes' do + context 'and the newly started task completes before the iteration completes', isolation: :truncation do + # Can't use transactions for isolation because we're using multiple threads let!(:cc_task) { TaskModel.make(guid: 'some-task-guid', state: TaskModel::RUNNING_STATE) } let(:bbs_tasks) { [] } From a5570e2a6c7b3a98812b2aa1798a0909add2f145 Mon Sep 17 00:00:00 2001 From: Philipp Thun Date: Thu, 15 Jan 2026 09:27:52 +0100 Subject: [PATCH 2/2] Update lib/cloud_controller/diego/tasks_sync.rb Co-authored-by: Johannes Haass --- lib/cloud_controller/diego/tasks_sync.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/cloud_controller/diego/tasks_sync.rb b/lib/cloud_controller/diego/tasks_sync.rb index 437fd2fa5a1..85ac7e1a942 100644 --- a/lib/cloud_controller/diego/tasks_sync.rb +++ b/lib/cloud_controller/diego/tasks_sync.rb @@ -78,8 +78,7 @@ def formatted_backtrace_from_error(error) def update_missing_diego_tasks(to_update) to_update.each do |task_guid| workpool.submit(task_guid) do |guid| - diego_task_missing = bbs_task_client.fetch_task(guid).nil? - if diego_task_missing + if bbs_task_client.fetch_task(guid).nil? # Mark the CC task as failed. Don't update tasks that are already in a terminal state. task = TaskModel.where(guid:).exclude(state: [TaskModel::FAILED_STATE, TaskModel::SUCCEEDED_STATE]).first task&.update(state: TaskModel::FAILED_STATE, failure_reason: BULKER_TASK_FAILURE) # invoke model's update method to create an event