From 3c060d7e72c7ffff1a29c563f59d9dc10560d65f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Thu, 9 Apr 2026 17:23:13 +0200 Subject: [PATCH 1/6] feat: add DelayedJobsRecover scheduled job to re-enqueue stuck service operations Introduces a new periodic recovery job that scans permanently failed delayed_jobs and re-enqueues polling for service operations still in progress at the broker. Recovers cases where a transient DB connection error caused the polling job to fail permanently (max_attempts=1) while the broker operation was still running, leaving the service instance stuck in 'in progress' with no active poller. --- app/jobs/runtime/delayed_jobs_recover.rb | 91 +++++++++++++++++++ config/cloud_controller.yml | 3 +- lib/cloud_controller/clock/scheduler.rb | 3 +- .../config_schemas/clock_schema.rb | 3 + lib/cloud_controller/jobs.rb | 1 + lib/tasks/jobs.rake | 1 + 6 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 app/jobs/runtime/delayed_jobs_recover.rb diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb new file mode 100644 index 00000000000..6caa2a11397 --- /dev/null +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -0,0 +1,91 @@ +module VCAP::CloudController + module Jobs + module Runtime + class DelayedJobsRecover < VCAP::CloudController::Jobs::CCJob + RECOVERABLE_OPERATIONS = %w[ + service_instance.create + ].freeze + + def perform + logger.info('Recover halted delayed jobs') + recover + end + + def max_attempts + 1 + end + + private + + def recover + # find delayed jobs where failed_at is set (permanently failed) + # and still within the max polling duration (not expired) + cutoff_time = Time.now - default_maximum_duration_seconds + dead_delayed_jobs = Delayed::Job. + exclude(failed_at: nil). + where { created_at > cutoff_time }. + order(:created_at). + limit(batch_size) + + dead_delayed_jobs.each do |delayed| + # pollable job state can be POLLING or FAILED depending on whether the failure + # hook managed to persist before the db connection was lost + pollable = PollableJobModel.where(delayed_job_guid: delayed.guid). + where(state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + first + next unless pollable + next unless RECOVERABLE_OPERATIONS.include?(pollable.operation) + + # last_operation.state must be 'in progress'. This confirms the broker is still + # working on the operation and CC is the one that gave up, not the broker + entity = find_entity(pollable) + next unless entity + next unless entity.last_operation&.state == 'in progress' + + reenqueue(pollable, delayed) + end + end + + def find_entity(pollable) + # TODO: resource_type field can be used + case pollable.operation + when 'service_instance.create' + ManagedServiceInstance.first(guid: pollable.resource_guid) + end + end + + def reenqueue(pollable, delayed) + # re-verify atomically that the pollable job still points to this dead delayed_job. + # if another process already re-enqueued a new job, pollable.delayed_job_guid was + # updated to the new delayed_job's guid, so where clause returns nil and we skip safely. + PollableJobModel.db.transaction do + pjob = PollableJobModel.where(guid: pollable.guid, + delayed_job_guid: delayed.guid, + state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + for_update.first + return unless pjob + + # bring the record into a clean polling state + pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE) + + # unwrap the serialized handler and re-enqueue via the reoccurring job + inner_job = Jobs::Enqueuer.unwrap_job(delayed.payload_object) + inner_job.send(:enqueue_next_job, pjob) + end + end + + def default_maximum_duration_seconds + Config.config.get(:broker_client_max_async_poll_duration_minutes).minutes + end + + def logger + @logger ||= Steno.logger('cc.background') + end + + def batch_size + 10 + end + end + end + end +end diff --git a/config/cloud_controller.yml b/config/cloud_controller.yml index 0983c7f0673..a9fd9394a8d 100644 --- a/config/cloud_controller.yml +++ b/config/cloud_controller.yml @@ -369,7 +369,8 @@ diego_sync: pending_droplets: frequency_in_seconds: 300 expiration_in_seconds: 42 - +delayed_jobs_recover: + frequency_in_seconds: 600 pending_builds: expiration_in_seconds: 42 frequency_in_seconds: 300 diff --git a/lib/cloud_controller/clock/scheduler.rb b/lib/cloud_controller/clock/scheduler.rb index 388b5db11d5..914dc67e350 100644 --- a/lib/cloud_controller/clock/scheduler.rb +++ b/lib/cloud_controller/clock/scheduler.rb @@ -24,7 +24,8 @@ class Scheduler { name: 'pending_droplets', class: Jobs::Runtime::PendingDropletCleanup }, { name: 'pending_builds', class: Jobs::Runtime::PendingBuildCleanup }, { name: 'failed_jobs', class: Jobs::Runtime::FailedJobsCleanup }, - { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup } + { name: 'service_operations_initial_cleanup', class: Jobs::Runtime::ServiceOperationsInitialCleanup }, + { name: 'delayed_jobs_recover', class: Jobs::Runtime::DelayedJobsRecover } ].freeze def initialize(config) diff --git a/lib/cloud_controller/config_schemas/clock_schema.rb b/lib/cloud_controller/config_schemas/clock_schema.rb index 81976bbf551..3ad399e50ca 100644 --- a/lib/cloud_controller/config_schemas/clock_schema.rb +++ b/lib/cloud_controller/config_schemas/clock_schema.rb @@ -34,6 +34,9 @@ class ClockSchema < VCAP::Config completed_tasks: { cutoff_age_in_days: Integer }, + delayed_jobs_recover: { + frequency_in_seconds: Integer + }, default_health_check_timeout: Integer, uaa: { diff --git a/lib/cloud_controller/jobs.rb b/lib/cloud_controller/jobs.rb index 9f39f53d152..47d49f4dde9 100644 --- a/lib/cloud_controller/jobs.rb +++ b/lib/cloud_controller/jobs.rb @@ -25,6 +25,7 @@ require 'jobs/runtime/expired_blob_cleanup' require 'jobs/runtime/expired_orphaned_blob_cleanup' require 'jobs/runtime/expired_resource_cleanup' +require 'jobs/runtime/delayed_jobs_recover' require 'jobs/runtime/failed_jobs_cleanup' require 'jobs/runtime/service_operations_initial_cleanup' require 'jobs/runtime/legacy_jobs' diff --git a/lib/tasks/jobs.rake b/lib/tasks/jobs.rake index 86c33fe9208..74946b32a25 100644 --- a/lib/tasks/jobs.rake +++ b/lib/tasks/jobs.rake @@ -49,6 +49,7 @@ namespace :jobs do 'audit_events', 'failed_jobs', 'service_operations_initial_cleanup', + 'delayed_jobs_recover', 'service_usage_events', 'completed_tasks', 'expired_blob_cleanup', From b8da6a7fbecbe5c3d7038e96cf88874f97a0c3df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Thu, 9 Apr 2026 17:46:00 +0200 Subject: [PATCH 2/6] fix: comment is fixed --- app/jobs/runtime/delayed_jobs_recover.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb index 6caa2a11397..e0af549b8c0 100644 --- a/app/jobs/runtime/delayed_jobs_recover.rb +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -65,7 +65,7 @@ def reenqueue(pollable, delayed) for_update.first return unless pjob - # bring the record into a clean polling state + # bring the pollable job into the clean polling state pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE) # unwrap the serialized handler and re-enqueue via the reoccurring job From 0ec67e44938117a1655ecdd528a455baf4f29489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Thu, 9 Apr 2026 18:07:24 +0200 Subject: [PATCH 3/6] feat: new sheduling jobs test is added --- spec/unit/lib/cloud_controller/clock/scheduler_spec.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb index 20d0e49b5bd..beb7a885091 100644 --- a/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb +++ b/spec/unit/lib/cloud_controller/clock/scheduler_spec.rb @@ -21,6 +21,7 @@ module VCAP::CloudController failed_jobs: { frequency_in_seconds: 400, cutoff_age_in_days: 4, max_number_of_failed_delayed_jobs: 10 }, pollable_jobs: { cutoff_age_in_days: 2 }, service_operations_initial_cleanup: { frequency_in_seconds: 600 }, + delayed_jobs_recover: { frequency_in_seconds: 600 }, service_usage_events: { cutoff_age_in_days: 5 }, completed_tasks: { cutoff_age_in_days: 6 }, pending_droplets: { frequency_in_seconds: 300, expiration_in_seconds: 600 }, @@ -161,6 +162,12 @@ module VCAP::CloudController expect(block.call).to be_instance_of(Jobs::Runtime::ServiceOperationsInitialCleanup) end + expect(clock).to receive(:schedule_frequent_worker_job) do |args, &block| + expect(args).to eql(name: 'delayed_jobs_recover', interval: 600) + expect(Jobs::Runtime::DelayedJobsRecover).to receive(:new).and_call_original + expect(block.call).to be_instance_of(Jobs::Runtime::DelayedJobsRecover) + end + schedule.start end From df047e329aeca8b57f814d3b2c6741100212b0b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Tue, 5 May 2026 13:40:32 +0200 Subject: [PATCH 4/6] fix: replace N+1 lookup with single join query in DelayedJobsRecover The previous implementation queried dead delayed_jobs then performed separate lookups per row to find the pollable job, entity, and last operation state. Replace with a single 4-table join across service_instance_operations, service_instances, jobs, and delayed_jobs, filtering all conditions in one query --- app/jobs/runtime/delayed_jobs_recover.rb | 74 +++---- ...05071445_add_jobs_operation_state_index.rb | 38 ++++ ...445_add_jobs_operation_state_index_spec.rb | 63 ++++++ .../jobs/runtime/delayed_jobs_recover_spec.rb | 194 ++++++++++++++++++ 4 files changed, 332 insertions(+), 37 deletions(-) create mode 100644 db/migrations/20260505071445_add_jobs_operation_state_index.rb create mode 100644 spec/migrations/20260505071445_add_jobs_operation_state_index_spec.rb create mode 100644 spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb index e0af549b8c0..3d38ddbe5be 100644 --- a/app/jobs/runtime/delayed_jobs_recover.rb +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -2,10 +2,6 @@ module VCAP::CloudController module Jobs module Runtime class DelayedJobsRecover < VCAP::CloudController::Jobs::CCJob - RECOVERABLE_OPERATIONS = %w[ - service_instance.create - ].freeze - def perform logger.info('Recover halted delayed jobs') recover @@ -18,48 +14,52 @@ def max_attempts private def recover - # find delayed jobs where failed_at is set (permanently failed) - # and still within the max polling duration (not expired) + # Find stuck service instance create operations where the broker is still working + # but CC's polling job has permanently failed due to a transient error (e.g. brief db connection flip). + # Join path: service_instance_operations → service_instances → jobs → delayed_jobs. + # + # Filters: + # - service_instance_operations.state='in progress': the broker has not yet reported a final state + # (succeeded or failed) that CC could successfully persist; if CC had received and saved a final + # state from the broker, this column would already be 'succeeded' or 'failed' — not 'in progress' + # - service_instance_operations.type='create': scope to create operations only + # - service_instance_operations.created_at > cutoff: operations beyond the max async polling window + # are intentionally excluded — the broker has given up on them too, so re-enqueuing is pointless + # - jobs.state IN (POLLING, FAILED): the pollable job has not reached a terminal success state; + # POLLING covers the case where the failure hook itself couldn't write FAILED due to the DB flip + # - jobs.operation='service_instance.create': prevents matching update/delete jobs for the same + # service instance that happen to share the same resource_guid + # - delayed_jobs.failed_at IS NOT NULL: the delayed job permanently failed (exhausted max_attempts); + # jobs still alive or locked have failed_at=NULL and must not be touched cutoff_time = Time.now - default_maximum_duration_seconds - dead_delayed_jobs = Delayed::Job. - exclude(failed_at: nil). - where { created_at > cutoff_time }. - order(:created_at). - limit(batch_size) - - dead_delayed_jobs.each do |delayed| - # pollable job state can be POLLING or FAILED depending on whether the failure - # hook managed to persist before the db connection was lost - pollable = PollableJobModel.where(delayed_job_guid: delayed.guid). - where(state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). - first - next unless pollable - next unless RECOVERABLE_OPERATIONS.include?(pollable.operation) - - # last_operation.state must be 'in progress'. This confirms the broker is still - # working on the operation and CC is the one that gave up, not the broker - entity = find_entity(pollable) - next unless entity - next unless entity.last_operation&.state == 'in progress' + stuck = ServiceInstanceOperation. + join(:service_instances, id: Sequel[:service_instance_operations][:service_instance_id]). + join(:jobs, resource_guid: Sequel[:service_instances][:guid]). + join(:delayed_jobs, guid: Sequel[:jobs][:delayed_job_guid]). + where(Sequel[:service_instance_operations][:state] => 'in progress'). + where(Sequel[:service_instance_operations][:type] => 'create'). + where { Sequel[:service_instance_operations][:created_at] > cutoff_time }. + where(Sequel[:jobs][:state] => [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + where(Sequel[:jobs][:operation] => 'service_instance.create'). + exclude(Sequel[:delayed_jobs][:failed_at] => nil). + select(Sequel[:jobs][:guid].as(:pollable_guid), Sequel[:delayed_jobs][:guid].as(:dj_guid)). + order(Sequel[:service_instance_operations][:created_at]). + limit(batch_size) - reenqueue(pollable, delayed) - end - end + stuck.each do |row| + delayed = Delayed::Job.first(guid: row[:dj_guid]) + next unless delayed - def find_entity(pollable) - # TODO: resource_type field can be used - case pollable.operation - when 'service_instance.create' - ManagedServiceInstance.first(guid: pollable.resource_guid) + reenqueue(row[:pollable_guid], delayed) end end - def reenqueue(pollable, delayed) + def reenqueue(pollable_guid, delayed) # re-verify atomically that the pollable job still points to this dead delayed_job. # if another process already re-enqueued a new job, pollable.delayed_job_guid was # updated to the new delayed_job's guid, so where clause returns nil and we skip safely. PollableJobModel.db.transaction do - pjob = PollableJobModel.where(guid: pollable.guid, + pjob = PollableJobModel.where(guid: pollable_guid, delayed_job_guid: delayed.guid, state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). for_update.first @@ -68,7 +68,7 @@ def reenqueue(pollable, delayed) # bring the pollable job into the clean polling state pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE) - # unwrap the serialized handler and re-enqueue via the reoccurring job + # unwrap the serialized handler and re-enqueue via the reoccurring job's enqueue_next_job method inner_job = Jobs::Enqueuer.unwrap_job(delayed.payload_object) inner_job.send(:enqueue_next_job, pjob) end diff --git a/db/migrations/20260505071445_add_jobs_operation_state_index.rb b/db/migrations/20260505071445_add_jobs_operation_state_index.rb new file mode 100644 index 00000000000..aeefa7f731e --- /dev/null +++ b/db/migrations/20260505071445_add_jobs_operation_state_index.rb @@ -0,0 +1,38 @@ +Sequel.migration do + no_transaction # required for concurrently option on postgres + + up do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + add_index :jobs, %i[operation state], + name: :jobs_operation_state_index, + where: "state IN ('POLLING', 'FAILED')", + if_not_exists: true, + concurrently: true + end + elsif database_type == :mysql + alter_table(:jobs) do + # rubocop:disable Sequel/ConcurrentIndex -- MySQL does not support concurrent index operations + add_index %i[operation state], name: :jobs_operation_state_index unless @db.indexes(:jobs).key?(:jobs_operation_state_index) + # rubocop:enable Sequel/ConcurrentIndex + end + end + end + + down do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + drop_index :jobs, %i[operation state], + name: :jobs_operation_state_index, + if_exists: true, + concurrently: true + end + elsif database_type == :mysql + alter_table(:jobs) do + # rubocop:disable Sequel/ConcurrentIndex + drop_index %i[operation state], name: :jobs_operation_state_index if @db.indexes(:jobs).key?(:jobs_operation_state_index) + # rubocop:enable Sequel/ConcurrentIndex + end + end + end +end diff --git a/spec/migrations/20260505071445_add_jobs_operation_state_index_spec.rb b/spec/migrations/20260505071445_add_jobs_operation_state_index_spec.rb new file mode 100644 index 00000000000..65edad88827 --- /dev/null +++ b/spec/migrations/20260505071445_add_jobs_operation_state_index_spec.rb @@ -0,0 +1,63 @@ +# rubocop:disable Migration/TooManyMigrationRuns +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +def operation_state_partial_index_present + # partial indexes are not returned in `db.indexes`. That's why we have to query this information manually. + partial_indexes = db.fetch("SELECT * FROM pg_indexes WHERE tablename = 'jobs' AND indexname = 'jobs_operation_state_index';") + + index_present = false + partial_indexes.each do |_index| + index_present = true + end + + index_present +end + +RSpec.describe 'migration to add operation_state_index on jobs table', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260505071445_add_jobs_operation_state_index.rb' } + end + + describe 'jobs table' do + it 'adds index and handles idempotency gracefully' do + if db.database_type == :postgres + # Test up migration + expect(operation_state_partial_index_present).to be_falsey + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_truthy + + # Test up migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_truthy + + # Test down migration + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_falsey + + # Test down migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(operation_state_partial_index_present).to be_falsey + + elsif db.database_type == :mysql + # Test up migration + expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index) + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).to include(:jobs_operation_state_index) + + # Test up migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).to include(:jobs_operation_state_index) + + # Test down migration + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index) + + # Test down migration idempotency + expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index) + end + end + end +end +# rubocop:enable Migration/TooManyMigrationRuns diff --git a/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb b/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb new file mode 100644 index 00000000000..70dbc77639c --- /dev/null +++ b/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb @@ -0,0 +1,194 @@ +require 'spec_helper' + +module VCAP::CloudController + module Jobs::Runtime + RSpec.describe DelayedJobsRecover, job_context: :worker do + subject(:job) { DelayedJobsRecover.new } + + let(:fake_logger) { instance_double(Steno::Logger, info: nil) } + let(:max_poll_duration_minutes) { 60 } + + before do + allow(Steno).to receive(:logger).and_return(fake_logger) + TestConfig.override(broker_client_max_async_poll_duration_minutes: max_poll_duration_minutes) + end + + # Builds a fully stuck scenario that the job should pick up and re-enqueue by default. + # All filter conditions are satisfied: sio is in progress/create/within cutoff, + # pjob is FAILED with operation=service_instance.create, delayed_job has failed_at set. + # Override individual parameters to break a single filter and test exclusion. + def make_stuck_scenario( + sio_state: 'in progress', + sio_type: 'create', + sio_created_at: Time.now, + pjob_state: PollableJobModel::FAILED_STATE, + dj_failed_at: Time.now + ) + service_instance = ManagedServiceInstance.make + + ServiceInstanceOperation.make( + service_instance_id: service_instance.id, + type: sio_type, + state: sio_state, + created_at: sio_created_at + ) + + dj = Delayed::Job.create!( + guid: SecureRandom.uuid, + handler: 'fake', + run_at: Time.now, + failed_at: dj_failed_at, + queue: 'cc-generic' + ) + + pjob = PollableJobModel.make( + state: pjob_state, + operation: 'service_instance.create', + resource_guid: service_instance.guid, + resource_type: 'service_instances', + delayed_job_guid: dj.guid + ) + + { service_instance: service_instance, pjob: pjob, delayed_job: dj } + end + + it { is_expected.to be_a_valid_job } + + describe '#perform' do + context 'when there are no stuck jobs' do + it 'does nothing' do + make_stuck_scenario(sio_state: 'succeeded') + expect(fake_logger).to receive(:info).with('Recover halted delayed jobs') + expect { job.perform }.not_to(change { PollableJobModel.where(state: PollableJobModel::POLLING_STATE).count }) + end + end + + context 'when sio state is not in progress' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(sio_state: 'succeeded') + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when sio type is not create' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(sio_type: 'update') + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when sio created_at is beyond the max polling window' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(sio_created_at: Time.now - (max_poll_duration_minutes + 1).minutes) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when delayed_job.failed_at is nil (job still running or locked)' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(dj_failed_at: nil) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when pollable job state is COMPLETE' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(pjob_state: PollableJobModel::COMPLETE_STATE) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::COMPLETE_STATE) + end + end + + context 'when pollable job state is PROCESSING' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario(pjob_state: PollableJobModel::PROCESSING_STATE) + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::PROCESSING_STATE) + end + end + + context 'when pollable job operation is not service_instance.create' do + it 'does not re-enqueue' do + scenario = make_stuck_scenario + scenario[:pjob].update(operation: 'service_instance.update') + job.perform + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::FAILED_STATE) + end + end + + context 'when a job is stuck with state FAILED' do + it 'calls reenqueue' do + scenario = make_stuck_scenario + expect_any_instance_of(described_class).to receive(:reenqueue).with(scenario[:pjob].guid, anything) + job.perform + end + end + + context 'when a job is stuck with state POLLING' do + it 'calls reenqueue (covers DB flip before failure hook could write FAILED)' do + scenario = make_stuck_scenario(pjob_state: PollableJobModel::POLLING_STATE) + expect_any_instance_of(described_class).to receive(:reenqueue).with(scenario[:pjob].guid, anything) + job.perform + end + end + + context 'when there are multiple stuck jobs within the batch size' do + it 'calls reenqueue for each' do + 3.times { make_stuck_scenario } + expect_any_instance_of(described_class).to receive(:reenqueue).exactly(3).times + job.perform + end + end + + context 'when there are more stuck jobs than the batch size (10)' do + it 'processes only up to 10 jobs per run' do + 11.times { make_stuck_scenario } + expect_any_instance_of(described_class).to receive(:reenqueue).exactly(10).times + job.perform + end + end + end + + describe '#reenqueue' do + let(:inner_job) { instance_double(Jobs::ReoccurringJob) } + + before do + allow(Jobs::Enqueuer).to receive(:unwrap_job).and_return(inner_job) + allow(inner_job).to receive(:enqueue_next_job) + end + + it 'resets pjob to POLLING state and clears cf_api_error' do + scenario = make_stuck_scenario + scenario[:pjob].update(cf_api_error: 'some error') + + job.send(:reenqueue, scenario[:pjob].guid, scenario[:delayed_job]) + + expect(scenario[:pjob].reload.state).to eq(PollableJobModel::POLLING_STATE) + expect(scenario[:pjob].reload.cf_api_error).to be_nil + end + + it 'calls enqueue_next_job on the unwrapped inner job' do + scenario = make_stuck_scenario + + expect(inner_job).to receive(:enqueue_next_job).with(instance_of(PollableJobModel)) + + job.send(:reenqueue, scenario[:pjob].guid, scenario[:delayed_job]) + end + + context 'when another process already re-enqueued the job (delayed_job_guid changed)' do + it 'skips without raising and does not call enqueue_next_job' do + scenario = make_stuck_scenario + scenario[:pjob].update(delayed_job_guid: 'some-other-guid') + + expect(inner_job).not_to receive(:enqueue_next_job) + expect { job.send(:reenqueue, scenario[:pjob].guid, scenario[:delayed_job]) }.not_to raise_error + end + end + end + end + end +end From f37a14c68e75d997a3d29a56f1bfdb0d82758aa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Tue, 5 May 2026 15:38:01 +0200 Subject: [PATCH 5/6] fix: warn added to mock logger --- spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb b/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb index 70dbc77639c..485e5df7969 100644 --- a/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb +++ b/spec/unit/jobs/runtime/delayed_jobs_recover_spec.rb @@ -5,7 +5,7 @@ module Jobs::Runtime RSpec.describe DelayedJobsRecover, job_context: :worker do subject(:job) { DelayedJobsRecover.new } - let(:fake_logger) { instance_double(Steno::Logger, info: nil) } + let(:fake_logger) { instance_double(Steno::Logger, info: nil, warn: nil) } let(:max_poll_duration_minutes) { 60 } before do From 655a7dbdedc2297de5b53721a5fe3b51dd4d5d56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Wed, 6 May 2026 10:49:06 +0200 Subject: [PATCH 6/6] fix: removed the state condition, since it doesn't add any valued to query --- app/jobs/runtime/delayed_jobs_recover.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/jobs/runtime/delayed_jobs_recover.rb b/app/jobs/runtime/delayed_jobs_recover.rb index 3d38ddbe5be..11653e977bc 100644 --- a/app/jobs/runtime/delayed_jobs_recover.rb +++ b/app/jobs/runtime/delayed_jobs_recover.rb @@ -60,8 +60,7 @@ def reenqueue(pollable_guid, delayed) # updated to the new delayed_job's guid, so where clause returns nil and we skip safely. PollableJobModel.db.transaction do pjob = PollableJobModel.where(guid: pollable_guid, - delayed_job_guid: delayed.guid, - state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]). + delayed_job_guid: delayed.guid). for_update.first return unless pjob