-
Notifications
You must be signed in to change notification settings - Fork 614
[Swarming] Push preprocess tasks to swarming queue #5282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
99bf603
56e6692
07bae18
de19568
e977447
d8614e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,8 @@ class FeatureFlags(Enum): | |
| PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit' | ||
|
|
||
| SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution' | ||
| # TODO(ibarba): Set this value based off dev & stage metrics and tests. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO should reference a bug link instead.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also agree its best to link to a bug, i previously asked gemini if we can add any go link or bug references in open source code/reviews in github, basically it said: Although im aware ai can allucinate and im not a open source expert, but as far as i know, thats the reason for this repo be full of So your call, can we include bug links in here?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @javanlacerda or @ViniciustCosta would know I'm sure! |
||
| SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit' | ||
|
|
||
| @property | ||
| def flag(self): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,22 +13,81 @@ | |
| # limitations under the License. | ||
| """Cron job to schedule fuzz tasks that run on batch.""" | ||
|
|
||
| from abc import ABC | ||
| from abc import abstractmethod | ||
| import collections | ||
| from dataclasses import dataclass | ||
| import random | ||
| import time | ||
|
|
||
| from google.cloud import monitoring_v3 | ||
|
|
||
| from clusterfuzz._internal.base import feature_flags | ||
| from clusterfuzz._internal import swarming | ||
| from clusterfuzz._internal.base import memoize | ||
| from clusterfuzz._internal.base import tasks | ||
| from clusterfuzz._internal.base import utils | ||
| from clusterfuzz._internal.base.feature_flags import FeatureFlags | ||
| from clusterfuzz._internal.datastore import data_types | ||
| from clusterfuzz._internal.datastore import ndb_utils | ||
| from clusterfuzz._internal.google_cloud_utils import credentials | ||
| from clusterfuzz._internal.metrics import logs | ||
|
|
||
| PREPROCESS_TARGET_SIZE_DEFAULT = 10000 | ||
| SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5 | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Swarming pool has a hard limit of 25 (LINUX) bots running 1 task each.
So, Injecting 5 tasks every 10 minutes matches the expected Swarming rate, preventing an infinitely growing backlog of stale tasks. This is still the default value, the real value is managed trough a feature flag, we will later tweak this feature flag based on metrics & how swarming handled this workload, so that we have a more acqurate value
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should prevent the infinitely-growing queue of tasks using |
||
|
|
||
|
|
||
| @dataclass | ||
|
IvanBM18 marked this conversation as resolved.
|
||
| class Queue: | ||
| """Data class that holds information about a pub/subqueue.""" | ||
| name: str | ||
| default_target_size: int | ||
| target_size_flag: FeatureFlags | ||
|
|
||
|
|
||
| _DEFAULT_QUEUE = Queue( | ||
| name=tasks.PREPROCESS_QUEUE, | ||
| default_target_size=PREPROCESS_TARGET_SIZE_DEFAULT, | ||
| target_size_flag=FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT, | ||
| ) | ||
|
|
||
| _SWARMING_QUEUE = Queue( | ||
| name=tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE], | ||
| default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, | ||
| target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT, | ||
| ) | ||
|
|
||
|
|
||
| def _get_jobs_for_platforms(platforms: list[str]) -> list[data_types.Job]: | ||
| """Returns all jobs for the given platforms.""" | ||
| return ndb_utils.get_all_from_query( | ||
| data_types.Job.query(data_types.Job.platform.IN(platforms))) | ||
|
|
||
|
|
||
| def _get_swarming_jobs(): | ||
| """Returns all jobs that have swarming environment variables.""" | ||
| jobs = [] | ||
| jobs.extend(_get_jobs_for_platforms(['ANDROID', 'LINUX'])) | ||
| return [ | ||
| job for job in jobs | ||
| if swarming.has_swarming_env_vars(job.get_environment()) | ||
| ] | ||
|
|
||
|
|
||
| def _remaining_queue_capacity(queue: Queue) -> int: | ||
| """Returns the remaining capacity of the given queue.""" | ||
| project = utils.get_application_id() | ||
| creds = credentials.get_default()[0] | ||
| preprocess_queue_size = get_queue_size(creds, project, queue.name) | ||
|
|
||
| target_size = queue.default_target_size | ||
| if queue.target_size_flag.enabled and queue.target_size_flag.content: | ||
| target_size = int(queue.target_size_flag.content) | ||
|
|
||
| num_tasks = target_size - preprocess_queue_size | ||
| logs.info(f'Queue {queue.name} size: {preprocess_queue_size}. ' | ||
| f'Target: {target_size}. Needed: {num_tasks}.') | ||
|
|
||
| return num_tasks | ||
|
|
||
|
|
||
| @memoize.wrap(memoize.InMemory(60)) | ||
|
|
@@ -62,14 +121,12 @@ def get_queue_size(creds, project_id, subscription_id): | |
| return 0 | ||
|
|
||
|
|
||
| class BaseFuzzTaskScheduler: | ||
| """Base fuzz task scheduler for any deployment of ClusterFuzz.""" | ||
| class BaseFuzzTaskProvider(ABC): | ||
| """Base fuzz task provider for any deployment of ClusterFuzz.""" | ||
|
|
||
| def __init__(self, num_tasks): | ||
| self.num_tasks = num_tasks | ||
|
|
||
| def get_fuzz_tasks(self): | ||
| raise NotImplementedError('Child class must implement.') | ||
| @abstractmethod | ||
| def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: | ||
| """Returns a list of fuzz tasks.""" | ||
|
|
||
|
|
||
| class FuzzTaskCandidate: | ||
|
|
@@ -98,10 +155,59 @@ def copy(self): | |
| base_os_version=self.base_os_version) | ||
|
|
||
|
|
||
| class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): | ||
| """Fuzz task scheduler for OSS-Fuzz.""" | ||
| def _fill_queue(queue: Queue, provider: BaseFuzzTaskProvider): | ||
| """Fills the given queue with tasks from the provider.""" | ||
| start = time.time() | ||
| num_tasks = _remaining_queue_capacity(queue) | ||
|
|
||
| def get_fuzz_tasks(self) -> list[tasks.Task]: | ||
| if num_tasks <= 0: | ||
| logs.info('Queue size met or exceeded. Not scheduling tasks.') | ||
| return | ||
|
|
||
| fuzz_tasks = provider.get_fuzz_tasks(num_tasks) | ||
| if not fuzz_tasks: | ||
| logs.error(f'No fuzz tasks found to schedule in queue {queue.name}.') | ||
| return | ||
|
|
||
| logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') | ||
| tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) | ||
| logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') | ||
|
|
||
| end = time.time() | ||
| total = end - start | ||
| logs.info(f'Task scheduling took {total} seconds.') | ||
|
|
||
|
|
||
| def _create_candidates_from_jobs( | ||
| jobs: list[data_types.Job]) -> list[FuzzTaskCandidate]: | ||
| """Create candidates from jobs & assign weights to them.""" | ||
| candidates_by_job = { | ||
| job.name: FuzzTaskCandidate( | ||
| job=job.name, | ||
| project=job.project, | ||
| base_os_version=job.base_os_version) for job in jobs | ||
| } | ||
|
|
||
| if not candidates_by_job: | ||
| return [] | ||
|
|
||
| fuzzer_job_query = ndb_utils.get_all_from_query( | ||
| data_types.FuzzerJob.query( | ||
| data_types.FuzzerJob.job.IN(list(candidates_by_job.keys())))) | ||
| fuzz_task_candidates = [] | ||
| for fuzzer_job in fuzzer_job_query: | ||
| candidate = candidates_by_job[fuzzer_job.job].copy() | ||
| candidate.fuzzer = fuzzer_job.fuzzer | ||
| candidate.weight = fuzzer_job.actual_weight | ||
| fuzz_task_candidates.append(candidate) | ||
|
|
||
| return fuzz_task_candidates | ||
|
|
||
|
|
||
| class OssfuzzFuzzTaskProvider(BaseFuzzTaskProvider): | ||
| """Fuzz task provider for OSS-Fuzz.""" | ||
|
|
||
| def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: | ||
| # TODO(metzman): Handle high end. | ||
| # A job's weight is determined by its own weight and the weight of the | ||
| # project is a part of. First get project weights. | ||
|
|
@@ -164,11 +270,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: | |
| for fuzz_task_candidate in fuzz_task_candidates: | ||
| weights.append(fuzz_task_candidate.weight) | ||
|
|
||
| fuzz_tasks_count = self.num_tasks | ||
| logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') | ||
| logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.') | ||
|
|
||
| choices = random.choices( | ||
| fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) | ||
| choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) | ||
| fuzz_tasks = [ | ||
| tasks.Task( | ||
| 'fuzz', | ||
|
|
@@ -183,45 +287,25 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: | |
| return fuzz_tasks | ||
|
|
||
|
|
||
| class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): | ||
| """Fuzz task scheduler for Chrome.""" | ||
| class ChromeFuzzTaskProvider(BaseFuzzTaskProvider): | ||
| """Fuzz task provider for Chrome.""" | ||
|
|
||
| def get_fuzz_tasks(self) -> list[tasks.Task]: | ||
| """Returns fuzz tasks for chrome, weighted by job weight.""" | ||
| logs.info('Getting jobs for Chrome.') | ||
| _candidates: list[FuzzTaskCandidate] | ||
|
|
||
| candidates_by_job = {} | ||
| # Only consider linux jobs for chrome fuzzing. | ||
| job_query = data_types.Job.query(data_types.Job.platform == 'LINUX') | ||
| for job in ndb_utils.get_all_from_query(job_query): | ||
| base_os_version = None | ||
| if job.base_os_version: | ||
| base_os_version = job.base_os_version | ||
|
|
||
| candidates_by_job[job.name] = FuzzTaskCandidate( | ||
| job=job.name, project=job.project, base_os_version=base_os_version) | ||
|
|
||
| fuzz_task_candidates = [] | ||
| fuzzer_job_query = ndb_utils.get_all_from_query( | ||
| data_types.FuzzerJob.query()) | ||
| def __init__(self, candidates: list[FuzzTaskCandidate]): | ||
| self._candidates = candidates | ||
|
|
||
| for fuzzer_job in fuzzer_job_query: | ||
| if fuzzer_job.job not in candidates_by_job: | ||
| continue | ||
| fuzz_task_candidate = candidates_by_job[fuzzer_job.job].copy() | ||
| fuzz_task_candidate.fuzzer = fuzzer_job.fuzzer | ||
| fuzz_task_candidate.weight = fuzzer_job.actual_weight | ||
| fuzz_task_candidates.append(fuzz_task_candidate) | ||
| def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: | ||
| """Returns fuzz tasks for chrome, weighted by job weight.""" | ||
| logs.info('Getting jobs for Chrome.') | ||
|
|
||
| weights = [candidate.weight for candidate in fuzz_task_candidates] | ||
| fuzz_tasks_count = self.num_tasks | ||
| logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') | ||
| weights = [candidate.weight for candidate in self._candidates] | ||
| logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.') | ||
|
|
||
| if not fuzz_task_candidates: | ||
| if not self._candidates: | ||
| return [] | ||
|
|
||
| choices = random.choices( | ||
| fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) | ||
| choices = random.choices(self._candidates, weights=weights, k=num_tasks) | ||
| fuzz_tasks = [ | ||
| tasks.Task( | ||
| 'fuzz', | ||
|
|
@@ -233,50 +317,29 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: | |
| return fuzz_tasks | ||
|
|
||
|
|
||
| def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: | ||
| if utils.is_oss_fuzz(): | ||
| scheduler = OssfuzzFuzzTaskScheduler(num_tasks) | ||
| else: | ||
| scheduler = ChromeFuzzTaskScheduler(num_tasks) | ||
| fuzz_tasks = scheduler.get_fuzz_tasks() | ||
| return fuzz_tasks | ||
|
|
||
|
|
||
| def schedule_fuzz_tasks() -> bool: | ||
| """Schedules fuzz tasks.""" | ||
| def schedule_chrome_fuzz_tasks(): | ||
| """Schedules fuzz tasks for Chrome.""" | ||
| default_jobs = _get_jobs_for_platforms(['LINUX']) | ||
| default_candidates = _create_candidates_from_jobs(default_jobs) | ||
| default_provider = ChromeFuzzTaskProvider(default_candidates) | ||
| _fill_queue(_DEFAULT_QUEUE, default_provider) | ||
|
|
||
| project = utils.get_application_id() | ||
| start = time.time() | ||
| creds = credentials.get_default()[0] | ||
| preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) | ||
| if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: | ||
| return | ||
|
|
||
| target_size = PREPROCESS_TARGET_SIZE_DEFAULT | ||
| target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT | ||
| if target_size_flag.enabled and target_size_flag.content: | ||
| target_size = int(target_size_flag.content) | ||
| swarming_jobs = _get_swarming_jobs() | ||
| swarming_candidates = _create_candidates_from_jobs(swarming_jobs) | ||
| swarming_provider = ChromeFuzzTaskProvider(swarming_candidates) | ||
| _fill_queue(_SWARMING_QUEUE, swarming_provider) | ||
|
|
||
| num_tasks = target_size - preprocess_queue_size | ||
| logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' | ||
| f'Target: {target_size}. Needed: {num_tasks}.') | ||
|
|
||
| if num_tasks <= 0: | ||
| logs.info('Queue size met or exceeded. Not scheduling tasks.') | ||
| return False | ||
|
|
||
| fuzz_tasks = get_fuzz_tasks(num_tasks) | ||
| if not fuzz_tasks: | ||
| logs.error('No fuzz tasks found to schedule.') | ||
| return False | ||
|
|
||
| logs.info(f'Adding {fuzz_tasks} to preprocess queue.') | ||
| tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) | ||
| logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') | ||
|
|
||
| end = time.time() | ||
| total = end - start | ||
| logs.info(f'Task scheduling took {total} seconds.') | ||
| return True | ||
| def schedule_fuzz_tasks(): | ||
| """Schedules fuzz tasks based on deployment type.""" | ||
| if utils.is_oss_fuzz(): | ||
| _fill_queue(_DEFAULT_QUEUE, OssfuzzFuzzTaskProvider()) | ||
| else: | ||
| schedule_chrome_fuzz_tasks() | ||
|
|
||
|
|
||
| def main(): | ||
| return schedule_fuzz_tasks() | ||
| schedule_fuzz_tasks() | ||
Uh oh!
There was an error while loading. Please reload this page.