diff --git a/src/clusterfuzz/_internal/base/feature_flags.py b/src/clusterfuzz/_internal/base/feature_flags.py index 9bea8c41f9..842bec18ae 100644 --- a/src/clusterfuzz/_internal/base/feature_flags.py +++ b/src/clusterfuzz/_internal/base/feature_flags.py @@ -36,6 +36,8 @@ class FeatureFlags(Enum): PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit' SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution' + # TODO(b/516630567): Set this value based off dev & stage metrics and tests. + SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit' @property def flag(self): diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index ae0c4e9ed0..7b054d2ee9 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -101,6 +101,11 @@ def get_task_duration(command): UTASK_MAIN_QUEUE = 'utask_main' PREPROCESS_QUEUE = 'preprocess' +SWARMING_QUEUES = { + PREPROCESS_QUEUE: 'preprocess-swarming', + UTASK_MAIN_QUEUE: 'utask_main-swarming', +} + # See https://github.com/google/clusterfuzz/issues/3347 for usage SUBQUEUE_IDENTIFIER = ':' diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index e032923bad..8364c7d1ee 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -13,22 +13,48 @@ # limitations under the License. """Cron job to schedule fuzz tasks that run on batch.""" +from abc import ABC +from abc import abstractmethod import collections +from dataclasses import dataclass import random import time from google.cloud import monitoring_v3 -from clusterfuzz._internal.base import feature_flags +from clusterfuzz._internal import swarming from clusterfuzz._internal.base import memoize from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils +from clusterfuzz._internal.base.feature_flags import FeatureFlags from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs PREPROCESS_TARGET_SIZE_DEFAULT = 10000 +SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5 + + +@dataclass +class Queue: + """Data class that holds information about a pub/subqueue.""" + name: str + default_target_size: int + target_size_flag: FeatureFlags + + +_DEFAULT_QUEUE = Queue( + name=tasks.PREPROCESS_QUEUE, + default_target_size=PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag=FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT, +) + +_SWARMING_QUEUE = Queue( + name=tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE], + default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT, +) @memoize.wrap(memoize.InMemory(60)) @@ -62,14 +88,12 @@ def get_queue_size(creds, project_id, subscription_id): return 0 -class BaseFuzzTaskScheduler: - """Base fuzz task scheduler for any deployment of ClusterFuzz.""" - - def __init__(self, num_tasks): - self.num_tasks = num_tasks +class BaseFuzzTaskProvider(ABC): + """Base fuzz task provider for any deployment of ClusterFuzz.""" - def get_fuzz_tasks(self): - raise NotImplementedError('Child class must implement.') + @abstractmethod + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: + """Returns a list of fuzz tasks.""" class FuzzTaskCandidate: @@ -98,10 +122,10 @@ def copy(self): base_os_version=self.base_os_version) -class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): - """Fuzz task scheduler for OSS-Fuzz.""" +class OssfuzzFuzzTaskProvider(BaseFuzzTaskProvider): + """Fuzz task provider for OSS-Fuzz.""" - def get_fuzz_tasks(self) -> list[tasks.Task]: + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: # TODO(metzman): Handle high end. # A job's weight is determined by its own weight and the weight of the # project is a part of. First get project weights. @@ -164,11 +188,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: for fuzz_task_candidate in fuzz_task_candidates: weights.append(fuzz_task_candidate.weight) - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') + logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.') - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -183,45 +205,25 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: return fuzz_tasks -class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): - """Fuzz task scheduler for Chrome.""" - - def get_fuzz_tasks(self) -> list[tasks.Task]: - """Returns fuzz tasks for chrome, weighted by job weight.""" - logs.info('Getting jobs for Chrome.') - - candidates_by_job = {} - # Only consider linux jobs for chrome fuzzing. - job_query = data_types.Job.query(data_types.Job.platform == 'LINUX') - for job in ndb_utils.get_all_from_query(job_query): - base_os_version = None - if job.base_os_version: - base_os_version = job.base_os_version +class ChromeFuzzTaskProvider(BaseFuzzTaskProvider): + """Fuzz task provider for Chrome.""" - candidates_by_job[job.name] = FuzzTaskCandidate( - job=job.name, project=job.project, base_os_version=base_os_version) + _candidates: list[FuzzTaskCandidate] - fuzz_task_candidates = [] - fuzzer_job_query = ndb_utils.get_all_from_query( - data_types.FuzzerJob.query()) + def __init__(self, candidates: list[FuzzTaskCandidate]): + self._candidates = candidates - for fuzzer_job in fuzzer_job_query: - if fuzzer_job.job not in candidates_by_job: - continue - fuzz_task_candidate = candidates_by_job[fuzzer_job.job].copy() - fuzz_task_candidate.fuzzer = fuzzer_job.fuzzer - fuzz_task_candidate.weight = fuzzer_job.actual_weight - fuzz_task_candidates.append(fuzz_task_candidate) + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: + """Returns fuzz tasks for chrome, weighted by job weight.""" + logs.info('Getting jobs for Chrome.') - weights = [candidate.weight for candidate in fuzz_task_candidates] - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') + weights = [candidate.weight for candidate in self._candidates] + logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.') - if not fuzz_task_candidates: + if not self._candidates: return [] - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(self._candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -233,50 +235,111 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: return fuzz_tasks -def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: - if utils.is_oss_fuzz(): - scheduler = OssfuzzFuzzTaskScheduler(num_tasks) - else: - scheduler = ChromeFuzzTaskScheduler(num_tasks) - fuzz_tasks = scheduler.get_fuzz_tasks() - return fuzz_tasks +def _get_jobs_for_platforms(platforms: list[str]) -> list[data_types.Job]: + """Returns all jobs for the given platforms.""" + return ndb_utils.get_all_from_query( + data_types.Job.query(data_types.Job.platform.IN(platforms))) + +def _get_swarming_jobs(): + """Returns all jobs that have swarming environment variables.""" + jobs = [] + jobs.extend(_get_jobs_for_platforms(['ANDROID', 'LINUX'])) + return [ + job for job in jobs + if swarming.has_swarming_env_vars(job.get_environment()) + ] -def schedule_fuzz_tasks() -> bool: - """Schedules fuzz tasks.""" +def _remaining_queue_capacity(queue: Queue) -> int: + """Returns the remaining capacity of the given queue.""" project = utils.get_application_id() - start = time.time() creds = credentials.get_default()[0] - preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) + preprocess_queue_size = get_queue_size(creds, project, queue.name) - target_size = PREPROCESS_TARGET_SIZE_DEFAULT - target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT - if target_size_flag.enabled and target_size_flag.content: - target_size = int(target_size_flag.content) + target_size = queue.default_target_size + if queue.target_size_flag.enabled and queue.target_size_flag.content: + target_size = int(queue.target_size_flag.content) num_tasks = target_size - preprocess_queue_size - logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' + logs.info(f'Queue {queue.name} size: {preprocess_queue_size}. ' f'Target: {target_size}. Needed: {num_tasks}.') + return num_tasks + + +def _fill_queue(queue: Queue, provider: BaseFuzzTaskProvider): + """Fills the given queue with tasks from the provider.""" + start = time.time() + num_tasks = _remaining_queue_capacity(queue) + if num_tasks <= 0: logs.info('Queue size met or exceeded. Not scheduling tasks.') - return False + return - fuzz_tasks = get_fuzz_tasks(num_tasks) + fuzz_tasks = provider.get_fuzz_tasks(num_tasks) if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - return False + logs.error(f'No fuzz tasks found to schedule in queue {queue.name}.') + return - logs.info(f'Adding {fuzz_tasks} to preprocess queue.') - tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') + logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') + tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') end = time.time() total = end - start logs.info(f'Task scheduling took {total} seconds.') - return True + + +def _create_candidates_from_jobs( + jobs: list[data_types.Job]) -> list[FuzzTaskCandidate]: + """Create candidates from jobs & assign weights to them.""" + candidates_by_job = { + job.name: FuzzTaskCandidate( + job=job.name, + project=job.project, + base_os_version=job.base_os_version) for job in jobs + } + + if not candidates_by_job: + return [] + + fuzzer_job_query = ndb_utils.get_all_from_query( + data_types.FuzzerJob.query( + data_types.FuzzerJob.job.IN(list(candidates_by_job.keys())))) + fuzz_task_candidates = [] + for fuzzer_job in fuzzer_job_query: + candidate = candidates_by_job[fuzzer_job.job].copy() + candidate.fuzzer = fuzzer_job.fuzzer + candidate.weight = fuzzer_job.actual_weight + fuzz_task_candidates.append(candidate) + + return fuzz_task_candidates + + +def schedule_chrome_fuzz_tasks(): + """Schedules fuzz tasks for Chrome.""" + default_jobs = _get_jobs_for_platforms(['LINUX']) + default_candidates = _create_candidates_from_jobs(default_jobs) + default_provider = ChromeFuzzTaskProvider(default_candidates) + _fill_queue(_DEFAULT_QUEUE, default_provider) + + if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + return + + swarming_jobs = _get_swarming_jobs() + swarming_candidates = _create_candidates_from_jobs(swarming_jobs) + swarming_provider = ChromeFuzzTaskProvider(swarming_candidates) + _fill_queue(_SWARMING_QUEUE, swarming_provider) + + +def schedule_fuzz_tasks(): + """Schedules fuzz tasks based on deployment type.""" + if utils.is_oss_fuzz(): + _fill_queue(_DEFAULT_QUEUE, OssfuzzFuzzTaskProvider()) + else: + schedule_chrome_fuzz_tasks() def main(): - return schedule_fuzz_tasks() + schedule_fuzz_tasks() diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 0369abcb62..160270d791 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -37,6 +37,13 @@ ] +def has_swarming_env_vars(job_environment: dict) -> bool: + """Returns True if the job environment contains swarming env vars.""" + return bool( + utils.string_is_true(job_environment.get('IS_SWARMING_JOB')) or + job_environment.get('SWARMING_DIMENSIONS')) + + def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: """Returns True if the task is supposed to run on swarming.""" if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: @@ -48,9 +55,7 @@ def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: logs.info('[Swarming DEBUG] Job not found', job_name=job_name) return False - job_environment = job.get_environment() - if not utils.string_is_true(job_environment.get( - 'IS_SWARMING_JOB')) and not job_environment.get('SWARMING_DIMENSIONS'): + if not has_swarming_env_vars(job.get_environment()): logs.info('[Swarming DEBUG] No swarming env var', job_name=job_name) return False diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index ae23d78fd0..8118b79cff 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self): data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() num_tasks = 5 - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks) - tasks = scheduler.get_fuzz_tasks() + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks) comparable_results = [] for task in tasks: comparable_results.append((task.command, task.argument, task.job)) @@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self): name=project_name, base_os_version='project-version').put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -186,8 +186,8 @@ def test_os_version_no_version(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + provider = schedule_fuzz.OssfuzzFuzzTaskProvider() + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -216,8 +216,10 @@ def _setup_chrome_entities(self, job_os_version=None): def _run_and_get_task(self): """Runs the scheduler and returns the single task created.""" - scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + jobs = list(data_types.Job.query()) + candidates = schedule_fuzz._create_candidates_from_jobs(jobs) + provider = schedule_fuzz.ChromeFuzzTaskProvider(candidates) + tasks = provider.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) return tasks[0] diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index c48d154058..da1d1b18c7 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -50,6 +50,47 @@ def setUp(self): os.environ.pop('PROJECT_NAME', None) os.environ.pop('HOST_JOB_SELECTION', None) + def test_has_swarming_env_vars_empty_env(self): + """Tests that empty environment returns False.""" + self.assertFalse(swarming.has_swarming_env_vars({})) + + def test_has_swarming_env_vars_is_swarming_job_true(self): + """Tests that IS_SWARMING_JOB=True returns True.""" + self.assertTrue(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': 'True'})) + self.assertTrue(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': 'true'})) + self.assertTrue(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': '1'})) + + def test_has_swarming_env_vars_is_swarming_job_false(self): + """Tests that IS_SWARMING_JOB=False returns False.""" + self.assertFalse( + swarming.has_swarming_env_vars({ + 'IS_SWARMING_JOB': 'False' + })) + self.assertFalse(swarming.has_swarming_env_vars({'IS_SWARMING_JOB': '0'})) + + def test_has_swarming_env_vars_swarming_dimensions(self): + """Tests that SWARMING_DIMENSIONS returns True.""" + self.assertTrue( + swarming.has_swarming_env_vars({ + 'SWARMING_DIMENSIONS': 'os:linux' + })) + + def test_has_swarming_env_vars_empty_dimensions_and_false_job(self): + """Tests that SWARMING_DIMENSIONS={} and IS_SWARMING_JOB=False returns False.""" + self.assertFalse( + swarming.has_swarming_env_vars({ + 'SWARMING_DIMENSIONS': {}, + 'IS_SWARMING_JOB': 'False' + })) + + def test_has_swarming_env_vars_both(self): + """Tests that both variables return True.""" + self.assertTrue( + swarming.has_swarming_env_vars({ + 'IS_SWARMING_JOB': 'True', + 'SWARMING_DIMENSIONS': 'os:linux' + })) + def test_get_spec_from_config_with_docker_image(self): """Tests that create_new_task_request works as expected.""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')