From 99bf603570db94c78542b78fdca818831a4e2819 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 18 May 2026 05:27:10 +0000 Subject: [PATCH 1/5] Add support for Swarming preprocess queue and task scheduling --- .../_internal/base/feature_flags.py | 2 + .../_internal/base/tasks/__init__.py | 5 + .../_internal/cron/schedule_fuzz.py | 132 ++++++++++-------- .../handlers/cron/schedule_fuzz_test.py | 20 +-- 4 files changed, 91 insertions(+), 68 deletions(-) diff --git a/src/clusterfuzz/_internal/base/feature_flags.py b/src/clusterfuzz/_internal/base/feature_flags.py index 9bea8c41f97..0b0e24750e6 100644 --- a/src/clusterfuzz/_internal/base/feature_flags.py +++ b/src/clusterfuzz/_internal/base/feature_flags.py @@ -36,6 +36,8 @@ class FeatureFlags(Enum): PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit' SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution' + # TODO(ibarba): Set this value based off dev & stage metrics and tests. + SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit' @property def flag(self): diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index ae0c4e9ed02..156f57e6d7f 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -101,6 +101,11 @@ def get_task_duration(command): UTASK_MAIN_QUEUE = 'utask_main' PREPROCESS_QUEUE = 'preprocess' +SWARMING_QUEUES = { + PREPROCESS_QUEUE: 'preprocess_swarming', + UTASK_MAIN_QUEUE: 'utask_main_swarming', +} + # See https://github.com/google/clusterfuzz/issues/3347 for usage SUBQUEUE_IDENTIFIER = ':' diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index e032923bad8..89708489062 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -13,22 +13,25 @@ # limitations under the License. """Cron job to schedule fuzz tasks that run on batch.""" +from abc import ABC +from abc import abstractmethod import collections import random import time from google.cloud import monitoring_v3 -from clusterfuzz._internal.base import feature_flags from clusterfuzz._internal.base import memoize from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils +from clusterfuzz._internal.base.feature_flags import FeatureFlags from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs PREPROCESS_TARGET_SIZE_DEFAULT = 10000 +SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5 @memoize.wrap(memoize.InMemory(60)) @@ -62,15 +65,55 @@ def get_queue_size(creds, project_id, subscription_id): return 0 -class BaseFuzzTaskScheduler: +class BaseFuzzTaskScheduler(ABC): """Base fuzz task scheduler for any deployment of ClusterFuzz.""" - def __init__(self, num_tasks): - self.num_tasks = num_tasks - - def get_fuzz_tasks(self): + @abstractmethod + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: raise NotImplementedError('Child class must implement.') + def schedule_fuzz_tasks(self) -> bool: + """Schedules fuzz tasks.""" + return self._schedule_fuzz_tasks() + + def _schedule_fuzz_tasks( + self, + queue: str = tasks.PREPROCESS_QUEUE, + default_target_size: int = PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag: FeatureFlags = FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT + ) -> bool: + """Internal method to schedule fuzz tasks.""" + project = utils.get_application_id() + start = time.time() + creds = credentials.get_default()[0] + preprocess_queue_size = get_queue_size(creds, project, queue) + + target_size = default_target_size + if target_size_flag.enabled and target_size_flag.content: + target_size = int(target_size_flag.content) + + num_tasks = target_size - preprocess_queue_size + logs.info(f'Queue {queue} size: {preprocess_queue_size}. ' + f'Target: {target_size}. Needed: {num_tasks}.') + + if num_tasks <= 0: + logs.info('Queue size met or exceeded. Not scheduling tasks.') + return False + + fuzz_tasks = self.get_fuzz_tasks(num_tasks) + if not fuzz_tasks: + logs.error('No fuzz tasks found to schedule.') + return False + + logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') + tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') + + end = time.time() + total = end - start + logs.info(f'Task scheduling took {total} seconds.') + return True + class FuzzTaskCandidate: """Data class that holds more info about FuzzerJobs than the ndb.Models do. @@ -101,7 +144,7 @@ def copy(self): class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for OSS-Fuzz.""" - def get_fuzz_tasks(self) -> list[tasks.Task]: + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: # TODO(metzman): Handle high end. # A job's weight is determined by its own weight and the weight of the # project is a part of. First get project weights. @@ -164,11 +207,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: for fuzz_task_candidate in fuzz_task_candidates: weights.append(fuzz_task_candidate.weight) - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') + logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.') - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -186,7 +227,7 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for Chrome.""" - def get_fuzz_tasks(self) -> list[tasks.Task]: + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: """Returns fuzz tasks for chrome, weighted by job weight.""" logs.info('Getting jobs for Chrome.') @@ -214,14 +255,12 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: fuzz_task_candidates.append(fuzz_task_candidate) weights = [candidate.weight for candidate in fuzz_task_candidates] - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') + logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.') if not fuzz_task_candidates: return [] - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -232,51 +271,28 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: ] return fuzz_tasks + def _schedule_swarming_fuzz_tasks(self) -> bool: + if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + return False -def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: - if utils.is_oss_fuzz(): - scheduler = OssfuzzFuzzTaskScheduler(num_tasks) - else: - scheduler = ChromeFuzzTaskScheduler(num_tasks) - fuzz_tasks = scheduler.get_fuzz_tasks() - return fuzz_tasks - - -def schedule_fuzz_tasks() -> bool: - """Schedules fuzz tasks.""" - - project = utils.get_application_id() - start = time.time() - creds = credentials.get_default()[0] - preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) + swarming_preprocess_queue = tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE] + return self._schedule_fuzz_tasks( + queue=swarming_preprocess_queue, + default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT) - target_size = PREPROCESS_TARGET_SIZE_DEFAULT - target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT - if target_size_flag.enabled and target_size_flag.content: - target_size = int(target_size_flag.content) + def _schedule_batch_fuzz_tasks(self) -> bool: + return self._schedule_fuzz_tasks() - num_tasks = target_size - preprocess_queue_size - logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' - f'Target: {target_size}. Needed: {num_tasks}.') - - if num_tasks <= 0: - logs.info('Queue size met or exceeded. Not scheduling tasks.') - return False - - fuzz_tasks = get_fuzz_tasks(num_tasks) - if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - return False - - logs.info(f'Adding {fuzz_tasks} to preprocess queue.') - tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') - - end = time.time() - total = end - start - logs.info(f'Task scheduling took {total} seconds.') - return True + def schedule_fuzz_tasks(self) -> bool: + self._schedule_swarming_fuzz_tasks() + self._schedule_batch_fuzz_tasks() + return True def main(): - return schedule_fuzz_tasks() + if utils.is_oss_fuzz(): + scheduler = OssfuzzFuzzTaskScheduler() + else: + scheduler = ChromeFuzzTaskScheduler() + return scheduler.schedule_fuzz_tasks() diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index ae23d78fd07..a007004a082 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self): data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() num_tasks = 5 - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks) comparable_results = [] for task in tasks: comparable_results.append((task.command, task.argument, task.job)) @@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self): name=project_name, base_os_version='project-version').put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -186,8 +186,8 @@ def test_os_version_no_version(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -216,8 +216,8 @@ def _setup_chrome_entities(self, job_os_version=None): def _run_and_get_task(self): """Runs the scheduler and returns the single task created.""" - scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.ChromeFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) return tasks[0] From 07bae1828b1418bd618c60ced70ab1025943636b Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 18 May 2026 23:09:55 +0000 Subject: [PATCH 2/5] Update queue name to use correct one --- src/clusterfuzz/_internal/base/tasks/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 156f57e6d7f..7b054d2ee96 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -102,8 +102,8 @@ def get_task_duration(command): PREPROCESS_QUEUE = 'preprocess' SWARMING_QUEUES = { - PREPROCESS_QUEUE: 'preprocess_swarming', - UTASK_MAIN_QUEUE: 'utask_main_swarming', + PREPROCESS_QUEUE: 'preprocess-swarming', + UTASK_MAIN_QUEUE: 'utask_main-swarming', } # See https://github.com/google/clusterfuzz/issues/3347 for usage From de1956810f85424d001eab237bd4937458d129bd Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 20 May 2026 05:23:13 +0000 Subject: [PATCH 3/5] Add Android as a requested platform for chrome jobs --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 5 +++-- .../handlers/cron/schedule_fuzz_test.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 89708489062..89abcbdce58 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -232,8 +232,9 @@ def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: logs.info('Getting jobs for Chrome.') candidates_by_job = {} - # Only consider linux jobs for chrome fuzzing. - job_query = data_types.Job.query(data_types.Job.platform == 'LINUX') + # Only consider LINUX or ANDROID jobs + job_query = data_types.Job.query( + data_types.Job.platform.IN(['LINUX', 'ANDROID'])) for job in ndb_utils.get_all_from_query(job_query): base_os_version = None if job.base_os_version: diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index a007004a082..9caa08c7aba 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -232,3 +232,19 @@ def test_os_version_job_without_version(self): self._setup_chrome_entities() task = self._run_and_get_task() self.assertIsNone(task.extra_info.get('base_os_version')) + + def test_job_filtering(self): + """Tests that jobs are filtered by platform""" + # Test wrong platform. + data_types.Job( + name='windows_job', + project='chrome', + platform='WINDOWS', + environment_string='IS_SWARMING_JOB = True').put() + data_types.FuzzerJob( + job='windows_job', platform='WINDOWS', fuzzer='libFuzzer', + weight=1.0).put() + + scheduler = schedule_fuzz.ChromeFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) + self.assertEqual(len(tasks), 0) From e977447a594bbef3a5e1853f152273f12d078b97 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Fri, 22 May 2026 08:39:03 +0000 Subject: [PATCH 4/5] Refactors schedule_fuzz.py to avoid complex inhertance --- .../_internal/cron/schedule_fuzz.py | 194 ++++++++++-------- .../_internal/swarming/__init__.py | 12 +- .../handlers/cron/schedule_fuzz_test.py | 26 +-- .../tests/core/swarming/swarming_test.py | 34 +++ 4 files changed, 171 insertions(+), 95 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 89abcbdce58..e72c860f2d6 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -13,14 +13,14 @@ # limitations under the License. """Cron job to schedule fuzz tasks that run on batch.""" -from abc import ABC -from abc import abstractmethod import collections +from dataclasses import dataclass import random import time from google.cloud import monitoring_v3 +from clusterfuzz._internal import swarming from clusterfuzz._internal.base import memoize from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils @@ -34,6 +34,83 @@ SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5 +@dataclass +class Queue: + name: str + default_target_size: int + target_size_flag: FeatureFlags + + +_DEFAULT_QUEUE = Queue( + name=tasks.PREPROCESS_QUEUE, + default_target_size=PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag=FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT, +) + +_SWARMING_QUEUE = Queue( + name=tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE], + default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT, +) + + +def _get_jobs_for_platform(platform: str) -> list[data_types.Job]: + """Returns all jobs for the given platform.""" + return ndb_utils.get_all_from_query( + data_types.Job.query(data_types.Job.platform == platform)) + + +def _get_swarming_jobs(): + """Returns all jobs that have swarming environment variables.""" + jobs = [] + jobs.extend(_get_jobs_for_platform('ANDROID')) + jobs.extend(_get_jobs_for_platform('LINUX')) + return [ + job for job in jobs + if swarming.has_swarming_env_vars(job.get_environment()) + ] + + +def _remaining_queue_capacity(queue: Queue) -> int: + """Returns the remaining capacity of the given queue.""" + project = utils.get_application_id() + creds = credentials.get_default()[0] + preprocess_queue_size = get_queue_size(creds, project, queue.name) + + target_size = queue.default_target_size + if queue.target_size_flag.enabled and queue.target_size_flag.content: + target_size = int(queue.target_size_flag.content) + + num_tasks = target_size - preprocess_queue_size + logs.info(f'Queue {queue.name} size: {preprocess_queue_size}. ' + f'Target: {target_size}. Needed: {num_tasks}.') + + return num_tasks + + +def _fill_queue(queue: Queue, provider: 'BaseFuzzTaskProvider'): + """Fills the given queue with tasks from the provider.""" + start = time.time() + num_tasks = _remaining_queue_capacity(queue) + + if num_tasks <= 0: + logs.info('Queue size met or exceeded. Not scheduling tasks.') + return + + fuzz_tasks = provider.get_fuzz_tasks(num_tasks) + if not fuzz_tasks: + logs.error(f'No fuzz tasks found to schedule in queue {queue.name}.') + return + + logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') + tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') + + end = time.time() + total = end - start + logs.info(f'Task scheduling took {total} seconds.') + + @memoize.wrap(memoize.InMemory(60)) def get_queue_size(creds, project_id, subscription_id): """Returns the size of the queue (unacked messages).""" @@ -65,55 +142,12 @@ def get_queue_size(creds, project_id, subscription_id): return 0 -class BaseFuzzTaskScheduler(ABC): - """Base fuzz task scheduler for any deployment of ClusterFuzz.""" +class BaseFuzzTaskProvider(): + """Base fuzz task provider for any deployment of ClusterFuzz.""" - @abstractmethod def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: raise NotImplementedError('Child class must implement.') - def schedule_fuzz_tasks(self) -> bool: - """Schedules fuzz tasks.""" - return self._schedule_fuzz_tasks() - - def _schedule_fuzz_tasks( - self, - queue: str = tasks.PREPROCESS_QUEUE, - default_target_size: int = PREPROCESS_TARGET_SIZE_DEFAULT, - target_size_flag: FeatureFlags = FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT - ) -> bool: - """Internal method to schedule fuzz tasks.""" - project = utils.get_application_id() - start = time.time() - creds = credentials.get_default()[0] - preprocess_queue_size = get_queue_size(creds, project, queue) - - target_size = default_target_size - if target_size_flag.enabled and target_size_flag.content: - target_size = int(target_size_flag.content) - - num_tasks = target_size - preprocess_queue_size - logs.info(f'Queue {queue} size: {preprocess_queue_size}. ' - f'Target: {target_size}. Needed: {num_tasks}.') - - if num_tasks <= 0: - logs.info('Queue size met or exceeded. Not scheduling tasks.') - return False - - fuzz_tasks = self.get_fuzz_tasks(num_tasks) - if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - return False - - logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') - tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') - - end = time.time() - total = end - start - logs.info(f'Task scheduling took {total} seconds.') - return True - class FuzzTaskCandidate: """Data class that holds more info about FuzzerJobs than the ndb.Models do. @@ -141,8 +175,8 @@ def copy(self): base_os_version=self.base_os_version) -class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): - """Fuzz task scheduler for OSS-Fuzz.""" +class OssfuzzFuzzTaskProvider(BaseFuzzTaskProvider): + """Fuzz task provider for OSS-Fuzz.""" def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: # TODO(metzman): Handle high end. @@ -224,24 +258,26 @@ def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: return fuzz_tasks -class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): - """Fuzz task scheduler for Chrome.""" +class ChromeFuzzTaskProvider(BaseFuzzTaskProvider): + """Fuzz task provider for Chrome.""" + + _candidates: list[FuzzTaskCandidate] + + def __init__(self, jobs: list[data_types.Job]): + self._candidates = [ + FuzzTaskCandidate( + job=job.name, + project=job.project, + base_os_version=job.base_os_version) + for job in jobs + if job.platform in ['LINUX', 'ANDROID'] + ] def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: """Returns fuzz tasks for chrome, weighted by job weight.""" logs.info('Getting jobs for Chrome.') - candidates_by_job = {} - # Only consider LINUX or ANDROID jobs - job_query = data_types.Job.query( - data_types.Job.platform.IN(['LINUX', 'ANDROID'])) - for job in ndb_utils.get_all_from_query(job_query): - base_os_version = None - if job.base_os_version: - base_os_version = job.base_os_version - - candidates_by_job[job.name] = FuzzTaskCandidate( - job=job.name, project=job.project, base_os_version=base_os_version) + candidates_by_job = {c.job: c for c in self._candidates} fuzz_task_candidates = [] fuzzer_job_query = ndb_utils.get_all_from_query( @@ -272,28 +308,26 @@ def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: ] return fuzz_tasks - def _schedule_swarming_fuzz_tasks(self) -> bool: - if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: - return False - swarming_preprocess_queue = tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE] - return self._schedule_fuzz_tasks( - queue=swarming_preprocess_queue, - default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, - target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT) +def schedule_chrome_fuzz_tasks(): + """Schedules fuzz tasks for Chrome.""" + default_provider = ChromeFuzzTaskProvider(_get_jobs_for_platform('LINUX')) + _fill_queue(_DEFAULT_QUEUE, default_provider) - def _schedule_batch_fuzz_tasks(self) -> bool: - return self._schedule_fuzz_tasks() + if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + return - def schedule_fuzz_tasks(self) -> bool: - self._schedule_swarming_fuzz_tasks() - self._schedule_batch_fuzz_tasks() - return True + swarming_provider = ChromeFuzzTaskProvider(_get_swarming_jobs()) + _fill_queue(_SWARMING_QUEUE, swarming_provider) -def main(): +def schedule_fuzz_tasks(): + """Schedules fuzz tasks based on deployment type.""" if utils.is_oss_fuzz(): - scheduler = OssfuzzFuzzTaskScheduler() + _fill_queue(_DEFAULT_QUEUE, OssfuzzFuzzTaskProvider()) else: - scheduler = ChromeFuzzTaskScheduler() - return scheduler.schedule_fuzz_tasks() + schedule_chrome_fuzz_tasks() + + +def main(): + schedule_fuzz_tasks() diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 0369abcb62c..c9d5b8e5301 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -37,6 +37,14 @@ ] +def has_swarming_env_vars(job_environment: dict) -> bool: + """Returns True if the job environment contains swarming env vars.""" + return bool( + job_environment and + (utils.string_is_true(job_environment.get('IS_SWARMING_JOB')) or + job_environment.get('SWARMING_DIMENSIONS'))) + + def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: """Returns True if the task is supposed to run on swarming.""" if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: @@ -48,9 +56,7 @@ def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: logs.info('[Swarming DEBUG] Job not found', job_name=job_name) return False - job_environment = job.get_environment() - if not utils.string_is_true(job_environment.get( - 'IS_SWARMING_JOB')) and not job_environment.get('SWARMING_DIMENSIONS'): + if not has_swarming_env_vars(job.get_environment()): logs.info('[Swarming DEBUG] No swarming env var', job_name=job_name) return False diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index 9caa08c7aba..3b5b52510ca 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self): data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() num_tasks = 5 - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() - tasks = scheduler.get_fuzz_tasks(num_tasks) + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks) comparable_results = [] for task in tasks: comparable_results.append((task.command, task.argument, task.job)) @@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self): name=project_name, base_os_version='project-version').put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() - tasks = scheduler.get_fuzz_tasks(num_tasks=1) + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() - tasks = scheduler.get_fuzz_tasks(num_tasks=1) + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -186,8 +186,8 @@ def test_os_version_no_version(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() - tasks = scheduler.get_fuzz_tasks(num_tasks=1) + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -216,8 +216,9 @@ def _setup_chrome_entities(self, job_os_version=None): def _run_and_get_task(self): """Runs the scheduler and returns the single task created.""" - scheduler = schedule_fuzz.ChromeFuzzTaskScheduler() - tasks = scheduler.get_fuzz_tasks(num_tasks=1) + jobs = list(data_types.Job.query()) + provider = schedule_fuzz.ChromeFuzzTaskProvider(jobs) + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) return tasks[0] @@ -245,6 +246,7 @@ def test_job_filtering(self): job='windows_job', platform='WINDOWS', fuzzer='libFuzzer', weight=1.0).put() - scheduler = schedule_fuzz.ChromeFuzzTaskScheduler() - tasks = scheduler.get_fuzz_tasks(num_tasks=1) + jobs = list(data_types.Job.query()) + provider = schedule_fuzz.ChromeFuzzTaskProvider(jobs) + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 0) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index c48d154058a..dac02170ee8 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -50,6 +50,40 @@ def setUp(self): os.environ.pop('PROJECT_NAME', None) os.environ.pop('HOST_JOB_SELECTION', None) + def test_has_swarming_env_vars_empty_env(self): + """Tests that empty environment returns False.""" + self.assertFalse(swarming.has_swarming_env_vars({})) + self.assertFalse(swarming.has_swarming_env_vars(None)) + + def test_has_swarming_env_vars_is_swarming_job_true(self): + """Tests that IS_SWARMING_JOB=True returns True.""" + self.assertTrue(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': 'True'})) + self.assertTrue(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': 'true'})) + self.assertTrue(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': '1'})) + + def test_has_swarming_env_vars_is_swarming_job_false(self): + """Tests that IS_SWARMING_JOB=False returns False.""" + self.assertFalse( + swarming.has_swarming_env_vars({ + 'IS_SWARMING_JOB': 'False' + })) + self.assertFalse(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': '0'})) + + def test_has_swarming_env_vars_swarming_dimensions(self): + """Tests that SWARMING_DIMENSIONS returns True.""" + self.assertTrue( + swarming.has_swarming_env_vars({ + 'SWARMING_DIMENSIONS': 'os:linux' + })) + + def test_has_swarming_env_vars_both(self): + """Tests that both variables return True.""" + self.assertTrue( + swarming.has_swarming_env_vars({ + 'IS_SWARMING_JOB': 'True', + 'SWARMING_DIMENSIONS': 'os:linux' + })) + def test_get_spec_from_config_with_docker_image(self): """Tests that create_new_task_request works as expected.""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') From d8614e781ea25e92ebfdf4bfe0b135d3d8917f24 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Fri, 22 May 2026 15:58:14 +0000 Subject: [PATCH 5/5] Moves weight logic outside of provider --- .../_internal/cron/schedule_fuzz.py | 128 ++++++++++-------- .../_internal/swarming/__init__.py | 5 +- .../handlers/cron/schedule_fuzz_test.py | 20 +-- .../tests/core/swarming/swarming_test.py | 9 +- 4 files changed, 82 insertions(+), 80 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index e72c860f2d6..9ebd61f98ac 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -13,6 +13,8 @@ # limitations under the License. """Cron job to schedule fuzz tasks that run on batch.""" +from abc import ABC +from abc import abstractmethod import collections from dataclasses import dataclass import random @@ -36,6 +38,7 @@ @dataclass class Queue: + """Data class that holds information about a pub/subqueue.""" name: str default_target_size: int target_size_flag: FeatureFlags @@ -54,17 +57,16 @@ class Queue: ) -def _get_jobs_for_platform(platform: str) -> list[data_types.Job]: - """Returns all jobs for the given platform.""" +def _get_jobs_for_platforms(platforms: list[str]) -> list[data_types.Job]: + """Returns all jobs for the given platforms.""" return ndb_utils.get_all_from_query( - data_types.Job.query(data_types.Job.platform == platform)) + data_types.Job.query(data_types.Job.platform.IN(platforms))) def _get_swarming_jobs(): """Returns all jobs that have swarming environment variables.""" jobs = [] - jobs.extend(_get_jobs_for_platform('ANDROID')) - jobs.extend(_get_jobs_for_platform('LINUX')) + jobs.extend(_get_jobs_for_platforms(['ANDROID', 'LINUX'])) return [ job for job in jobs if swarming.has_swarming_env_vars(job.get_environment()) @@ -88,29 +90,6 @@ def _remaining_queue_capacity(queue: Queue) -> int: return num_tasks -def _fill_queue(queue: Queue, provider: 'BaseFuzzTaskProvider'): - """Fills the given queue with tasks from the provider.""" - start = time.time() - num_tasks = _remaining_queue_capacity(queue) - - if num_tasks <= 0: - logs.info('Queue size met or exceeded. Not scheduling tasks.') - return - - fuzz_tasks = provider.get_fuzz_tasks(num_tasks) - if not fuzz_tasks: - logs.error(f'No fuzz tasks found to schedule in queue {queue.name}.') - return - - logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') - tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') - - end = time.time() - total = end - start - logs.info(f'Task scheduling took {total} seconds.') - - @memoize.wrap(memoize.InMemory(60)) def get_queue_size(creds, project_id, subscription_id): """Returns the size of the queue (unacked messages).""" @@ -142,11 +121,12 @@ def get_queue_size(creds, project_id, subscription_id): return 0 -class BaseFuzzTaskProvider(): +class BaseFuzzTaskProvider(ABC): """Base fuzz task provider for any deployment of ClusterFuzz.""" + @abstractmethod def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: - raise NotImplementedError('Child class must implement.') + """Returns a list of fuzz tasks.""" class FuzzTaskCandidate: @@ -175,6 +155,55 @@ def copy(self): base_os_version=self.base_os_version) +def _fill_queue(queue: Queue, provider: BaseFuzzTaskProvider): + """Fills the given queue with tasks from the provider.""" + start = time.time() + num_tasks = _remaining_queue_capacity(queue) + + if num_tasks <= 0: + logs.info('Queue size met or exceeded. Not scheduling tasks.') + return + + fuzz_tasks = provider.get_fuzz_tasks(num_tasks) + if not fuzz_tasks: + logs.error(f'No fuzz tasks found to schedule in queue {queue.name}.') + return + + logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') + tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') + + end = time.time() + total = end - start + logs.info(f'Task scheduling took {total} seconds.') + + +def _create_candidates_from_jobs( + jobs: list[data_types.Job]) -> list[FuzzTaskCandidate]: + """Create candidates from jobs & assign weights to them.""" + candidates_by_job = { + job.name: FuzzTaskCandidate( + job=job.name, + project=job.project, + base_os_version=job.base_os_version) for job in jobs + } + + if not candidates_by_job: + return [] + + fuzzer_job_query = ndb_utils.get_all_from_query( + data_types.FuzzerJob.query( + data_types.FuzzerJob.job.IN(list(candidates_by_job.keys())))) + fuzz_task_candidates = [] + for fuzzer_job in fuzzer_job_query: + candidate = candidates_by_job[fuzzer_job.job].copy() + candidate.fuzzer = fuzzer_job.fuzzer + candidate.weight = fuzzer_job.actual_weight + fuzz_task_candidates.append(candidate) + + return fuzz_task_candidates + + class OssfuzzFuzzTaskProvider(BaseFuzzTaskProvider): """Fuzz task provider for OSS-Fuzz.""" @@ -263,41 +292,20 @@ class ChromeFuzzTaskProvider(BaseFuzzTaskProvider): _candidates: list[FuzzTaskCandidate] - def __init__(self, jobs: list[data_types.Job]): - self._candidates = [ - FuzzTaskCandidate( - job=job.name, - project=job.project, - base_os_version=job.base_os_version) - for job in jobs - if job.platform in ['LINUX', 'ANDROID'] - ] + def __init__(self, candidates: list[FuzzTaskCandidate]): + self._candidates = candidates def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: """Returns fuzz tasks for chrome, weighted by job weight.""" logs.info('Getting jobs for Chrome.') - candidates_by_job = {c.job: c for c in self._candidates} - - fuzz_task_candidates = [] - fuzzer_job_query = ndb_utils.get_all_from_query( - data_types.FuzzerJob.query()) - - for fuzzer_job in fuzzer_job_query: - if fuzzer_job.job not in candidates_by_job: - continue - fuzz_task_candidate = candidates_by_job[fuzzer_job.job].copy() - fuzz_task_candidate.fuzzer = fuzzer_job.fuzzer - fuzz_task_candidate.weight = fuzzer_job.actual_weight - fuzz_task_candidates.append(fuzz_task_candidate) - - weights = [candidate.weight for candidate in fuzz_task_candidates] + weights = [candidate.weight for candidate in self._candidates] logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.') - if not fuzz_task_candidates: + if not self._candidates: return [] - choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) + choices = random.choices(self._candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -311,13 +319,17 @@ def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: def schedule_chrome_fuzz_tasks(): """Schedules fuzz tasks for Chrome.""" - default_provider = ChromeFuzzTaskProvider(_get_jobs_for_platform('LINUX')) + default_jobs = _get_jobs_for_platforms(['LINUX']) + default_candidates = _create_candidates_from_jobs(default_jobs) + default_provider = ChromeFuzzTaskProvider(default_candidates) _fill_queue(_DEFAULT_QUEUE, default_provider) if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: return - swarming_provider = ChromeFuzzTaskProvider(_get_swarming_jobs()) + swarming_jobs = _get_swarming_jobs() + swarming_candidates = _create_candidates_from_jobs(swarming_jobs) + swarming_provider = ChromeFuzzTaskProvider(swarming_candidates) _fill_queue(_SWARMING_QUEUE, swarming_provider) diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index c9d5b8e5301..160270d791c 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -40,9 +40,8 @@ def has_swarming_env_vars(job_environment: dict) -> bool: """Returns True if the job environment contains swarming env vars.""" return bool( - job_environment and - (utils.string_is_true(job_environment.get('IS_SWARMING_JOB')) or - job_environment.get('SWARMING_DIMENSIONS'))) + utils.string_is_true(job_environment.get('IS_SWARMING_JOB')) or + job_environment.get('SWARMING_DIMENSIONS')) def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index 3b5b52510ca..8118b79cfff 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -217,7 +217,8 @@ def _setup_chrome_entities(self, job_os_version=None): def _run_and_get_task(self): """Runs the scheduler and returns the single task created.""" jobs = list(data_types.Job.query()) - provider = schedule_fuzz.ChromeFuzzTaskProvider(jobs) + candidates = schedule_fuzz._create_candidates_from_jobs(jobs) + provider = schedule_fuzz.ChromeFuzzTaskProvider(candidates) tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) return tasks[0] @@ -233,20 +234,3 @@ def test_os_version_job_without_version(self): self._setup_chrome_entities() task = self._run_and_get_task() self.assertIsNone(task.extra_info.get('base_os_version')) - - def test_job_filtering(self): - """Tests that jobs are filtered by platform""" - # Test wrong platform. - data_types.Job( - name='windows_job', - project='chrome', - platform='WINDOWS', - environment_string='IS_SWARMING_JOB = True').put() - data_types.FuzzerJob( - job='windows_job', platform='WINDOWS', fuzzer='libFuzzer', - weight=1.0).put() - - jobs = list(data_types.Job.query()) - provider = schedule_fuzz.ChromeFuzzTaskProvider(jobs) - tasks = provider.get_fuzz_tasks(num_tasks=1) - self.assertEqual(len(tasks), 0) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index dac02170ee8..da1d1b18c71 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -53,7 +53,6 @@ def setUp(self): def test_has_swarming_env_vars_empty_env(self): """Tests that empty environment returns False.""" self.assertFalse(swarming.has_swarming_env_vars({})) - self.assertFalse(swarming.has_swarming_env_vars(None)) def test_has_swarming_env_vars_is_swarming_job_true(self): """Tests that IS_SWARMING_JOB=True returns True.""" @@ -76,6 +75,14 @@ def test_has_swarming_env_vars_swarming_dimensions(self): 'SWARMING_DIMENSIONS': 'os:linux' })) + def test_has_swarming_env_vars_empty_dimensions_and_false_job(self): + """Tests that SWARMING_DIMENSIONS={} and IS_SWARMING_JOB=False returns False.""" + self.assertFalse( + swarming.has_swarming_env_vars({ + 'SWARMING_DIMENSIONS': {}, + 'IS_SWARMING_JOB': 'False' + })) + def test_has_swarming_env_vars_both(self): """Tests that both variables return True.""" self.assertTrue(