diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 0369abcb62..86ab69ba7e 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -17,9 +17,6 @@ import json import uuid -from google.auth.transport import requests -from google.protobuf import json_format - from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.errors import BadConfigError from clusterfuzz._internal.base.feature_flags import FeatureFlags @@ -31,11 +28,6 @@ from clusterfuzz._internal.protos import swarming_pb2 from clusterfuzz._internal.system import environment -_SWARMING_SCOPES = [ - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/userinfo.email' -] - def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: """Returns True if the task is supposed to run on swarming.""" @@ -54,7 +46,7 @@ def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: logs.info('[Swarming DEBUG] No swarming env var', job_name=job_name) return False - swarming_config = _get_swarming_config() + swarming_config = get_swarming_config() if swarming_config is None: logs.warning( """[Swarming DEBUG] current task is not suitable for swarming. @@ -74,7 +66,7 @@ def _get_task_name(job_name: str): return f't-{str(uuid.uuid4()).lower()}-{job_name}' -def _get_swarming_config() -> local_config.SwarmingConfig | None: +def get_swarming_config() -> local_config.SwarmingConfig | None: """Returns the swarming config.""" try: return local_config.SwarmingConfig() @@ -87,7 +79,7 @@ def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list ) -> list[swarming_pb2.StringPair]: # pylint: disable=no-member """ Gets all swarming dimensions for a task. Job dimensions have more precedence than static dimensions""" - swarming_config = _get_swarming_config() + swarming_config = get_swarming_config() if not swarming_config: logs.error( '[Swarming] No dimensions set. Reason: failed to retrieve config') @@ -95,7 +87,7 @@ def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list unique_dimensions = {} unique_dimensions['os'] = str(job.platform).capitalize() - unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool') + unique_dimensions['pool'] = swarming_config.get('swarming_pool') for dimension in platform_specific_dimensions: unique_dimensions[dimension['key'].lower()] = dimension['value'] @@ -202,7 +194,7 @@ def create_new_task_request(command: str, job_name: str, download_url: str if job is None: return None - swarming_config = _get_swarming_config() + swarming_config = get_swarming_config() if not swarming_config: return None @@ -255,36 +247,3 @@ def create_new_task_request(command: str, job_name: str, download_url: str ]) return new_task_request - - -def push_swarming_task(task_request: swarming_pb2.NewTaskRequest): # pylint: disable=no-member - """Schedules a task on swarming.""" - swarming_config = _get_swarming_config() - if not swarming_config: - logs.error( - '[Swarming] Failed to push task into swarming. Reason: No config.') - return - creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES) - if not creds: - logs.error( - '[Swarming] Failed to push task into swarming. Reason: No credentials.') - return - - if not creds.token: - creds.refresh(requests.Request()) - - headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {creds.token}' - } - swarming_server = _get_swarming_config().get('swarming_server') - url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask' - message_body = json_format.MessageToJson(task_request) - logs.info( - f"""[Swarming] Pushing task {task_request.name} - as {creds.service_account_email}""", - url=url, - body=message_body) - response = utils.post_url(url=url, data=message_body, headers=headers) - logs.info(f'[Swarming] Response from {task_request.name}', response=response) diff --git a/src/clusterfuzz/_internal/swarming/api.py b/src/clusterfuzz/_internal/swarming/api.py new file mode 100644 index 0000000000..ac44263b26 --- /dev/null +++ b/src/clusterfuzz/_internal/swarming/api.py @@ -0,0 +1,156 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Swarming pRPC API client.""" + +from typing import Optional + +from google.auth import exceptions as auth_exceptions +from google.protobuf import json_format + +from clusterfuzz._internal.base import utils +from clusterfuzz._internal.config.local_config import SwarmingConfig +from clusterfuzz._internal.google_cloud_utils import credentials +from clusterfuzz._internal.metrics import logs +from clusterfuzz._internal.protos import swarming_pb2 +from clusterfuzz._internal.swarming import get_swarming_config + +_SWARMING_SCOPES = [ + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/userinfo.email' +] + +_COUNT_TASKS_ENDPOINT = 'swarming.v2.Tasks/CountTasks' +_NEW_TASK_ENDPOINT = 'swarming.v2.Tasks/NewTask' + + +class SwarmingApi: + """Client for Swarming pRPC API.""" + + _config: SwarmingConfig + _base_url: str = "" + + def __init__(self, config: SwarmingConfig): + self._config = config + self._base_url = f"https://{self._config.get('swarming_server')}/prpc/" + + @staticmethod + def create() -> Optional['SwarmingApi']: + """Creates a SwarmingApi instance if config is available. + + Returns: + A SwarmingApi instance if config is available, None otherwise. + """ + config = get_swarming_config() + if config is None: + return None + + return SwarmingApi(config) + + def _get_token(self) -> str: + """Gets a valid token for the Swarming API. Returns "" if it fails.""" + try: + creds = credentials.get_scoped_service_account_credentials( + _SWARMING_SCOPES) + if not creds: + logs.error('[Swarming] Failed to get credentials. None found.') + return "" + + return creds.token + except (auth_exceptions.DefaultCredentialsError, + auth_exceptions.RefreshError, auth_exceptions.TransportError) as e: + logs.error(f'[Swarming] Failed to get token with: {e}.') + return "" + + def _get_headers(self) -> dict[str, str]: + """Checks config and returns headers for pRPC request. + + Returns: + A dict containing headers. + """ + token = self._get_token() + + return { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}' + } + + def _make_request(self, endpoint: str, body: str) -> str | None: + """Makes a pRPC request to the Swarming API. + + Args: + endpoint: The pRPC endpoint (e.g. "swarming.v2.Tasks/NewTask"). + body: The JSON body of the request. + + Returns: + The raw JSON response string from the server, or None if the response is + empty. + + Raises: + requests.exceptions.HTTPError: If the request fails with a 4xx or 5xx + status code. + """ + headers = self._get_headers() + + url = f'{self._base_url}{endpoint}' + logs.info( + f"[Swarming] Making request to {url}", + url=self._base_url, + endpoint=endpoint, + body=body, + headers=headers) + response = utils.post_url(url=url, data=body, headers=headers) + if not response: + logs.error(f"[Swarming] Failed to make request to {url}. Empty response") + return None + return response + + def push_task(self, task_request: swarming_pb2.NewTaskRequest) -> str | None: # pylint: disable=no-member + """Schedules a task on swarming. + + Args: + task_request: The NewTaskRequest proto message. + + Returns: + The raw JSON response string from the server, or None if the response is + empty. + + Raises: + requests.exceptions.HTTPError: If the request fails with a 4xx or 5xx + status code. + """ + message_body = json_format.MessageToJson(task_request) + + response = self._make_request(_NEW_TASK_ENDPOINT, message_body) + return response + + def count_tasks(self, + count_request: swarming_pb2.TasksCountRequest) -> str | None: # pylint: disable=no-member + """Counts tasks on swarming. + + Args: + count_request: The TasksCountRequest proto message. + + Returns: + The raw JSON response string from the server, or None if the response is + empty. + + Raises: + requests.exceptions.HTTPError: If the request fails with a 4xx or 5xx + status code. + """ + message_body = json_format.MessageToJson(count_request) + + response = self._make_request(_COUNT_TASKS_ENDPOINT, message_body) + return response diff --git a/src/clusterfuzz/_internal/swarming/service.py b/src/clusterfuzz/_internal/swarming/service.py index 30c1bad677..6b430c72df 100644 --- a/src/clusterfuzz/_internal/swarming/service.py +++ b/src/clusterfuzz/_internal/swarming/service.py @@ -12,16 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. """Swarming service.""" +from requests.exceptions import HTTPError from clusterfuzz._internal import swarming from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.remote_task import remote_task_types +from clusterfuzz._internal.swarming.api import SwarmingApi class SwarmingService(remote_task_types.RemoteTaskInterface): """Remote task service implementation for Swarming.""" + _api: SwarmingApi | None = None + + def __init__(self): + self._api = SwarmingApi.create() + def create_utask_main_job(self, module: str, job_type: str, input_download_url: str): """Creates a single swarming task for a uworker main task.""" @@ -51,10 +58,11 @@ def create_utask_main_jobs(self, continue if request := swarming.create_new_task_request( task.command, task.job_type, task.argument): - swarming.push_swarming_task(request) - except Exception: # pylint: disable=broad-except - logs.error( - f'Failed to push task to Swarming: {task.command}, {task.job_type}.' - ) + self._api.push_task(request) + except HTTPError as api_failure: + logs.warning( + f'''Failed to push task to Swarming: {task.command}, {task.job_type} + . Reason: {api_failure}. + ''') unscheduled_tasks.append(task) return unscheduled_tasks diff --git a/src/clusterfuzz/_internal/tests/core/swarming/api_test.py b/src/clusterfuzz/_internal/tests/core/swarming/api_test.py new file mode 100644 index 0000000000..eab1561dd3 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/swarming/api_test.py @@ -0,0 +1,154 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for api.py.""" +import unittest +from unittest import mock + +from google.protobuf import json_format + +from clusterfuzz._internal.protos import swarming_pb2 +from clusterfuzz._internal.swarming.api import SwarmingApi +from clusterfuzz._internal.tests.test_libs import helpers + + +class SwarmingAPITest(unittest.TestCase): + """Tests for SwarmingAPI.""" + + def setUp(self): + helpers.patch(self, [ + 'clusterfuzz._internal.base.utils.post_url', + 'clusterfuzz._internal.google_cloud_utils.credentials.get_scoped_service_account_credentials', + 'google.auth.transport.requests.Request', + ]) + helpers.patch_environ(self) + + self.mock_creds = mock.MagicMock() + self.mock_creds.token = 'fake_token' + self.mock.get_scoped_service_account_credentials.return_value = self.mock_creds + + self.api = SwarmingApi.create() + + def test_push_task(self): + """Tests that push_task works as expected.""" + expected_response = '{"taskId": "123"}' + self.mock.post_url.return_value = expected_response + task_request = swarming_pb2.NewTaskRequest( + name='test_task', + priority=1, + realm='realm-name', + service_account='test-account@google.com', + task_slices=[ + swarming_pb2.TaskSlice( + expiration_secs=86400, + properties=swarming_pb2.TaskProperties( + command=['./run.sh'], + dimensions=[ + swarming_pb2.StringPair(key='os', value='Linux'), + swarming_pb2.StringPair(key='pool', value='test-pool') + ], + execution_timeout_secs=3600, + env=[ + swarming_pb2.StringPair(key='UWORKER', value='True'), + ], + secret_bytes=b'secret_data')) + ]) + + response = self.api.push_task(task_request) + + expected_headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer fake_token' + } + expected_url = 'https://server-name/prpc/swarming.v2.Tasks/NewTask' + self.mock.post_url.assert_called_with( + url=expected_url, + data=json_format.MessageToJson(task_request), + headers=expected_headers) + self.assertEqual(response, expected_response) + + def test_count_tasks(self): + """Tests that count_tasks works as expected.""" + count_request = swarming_pb2.TasksCountRequest(tags=['tag1']) + + self.mock.post_url.return_value = '{"count": 42}' + + response = self.api.count_tasks(count_request) + + expected_headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer fake_token' + } + expected_url = 'https://server-name/prpc/swarming.v2.Tasks/CountTasks' + self.mock.post_url.assert_called_with( + url=expected_url, + data=json_format.MessageToJson(count_request), + headers=expected_headers) + + self.assertEqual(response, '{"count": 42}') + + def test_create_no_config(self): + """Tests that create returns None when config is missing.""" + with mock.patch('clusterfuzz._internal.swarming.api.get_swarming_config' + ) as mock_get_config: + mock_get_config.return_value = None + self.assertIsNone(SwarmingApi.create()) + + def test_push_task_no_credentials(self): + """Tests that push_task gets called with an empty token when credentials are missing.""" + self.mock.get_scoped_service_account_credentials.return_value = None + self.api.push_task(swarming_pb2.NewTaskRequest()) + + _, kwargs = self.mock.post_url.call_args + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer ') + + def test_count_tasks_no_credentials(self): + """Tests that count_tasks gets called with an empty token when credentials are missing.""" + self.mock.get_scoped_service_account_credentials.return_value = None + self.api.count_tasks(swarming_pb2.TasksCountRequest()) + + _, kwargs = self.mock.post_url.call_args + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer ') + + def test_push_task_auth_error(self): + """Tests that push_task raises HTTPError on auth failure.""" + from requests.exceptions import HTTPError + self.mock.get_scoped_service_account_credentials.return_value = None + self.mock.post_url.side_effect = HTTPError( + "Unauthorized", response=mock.Mock(status_code=401)) + + with self.assertRaises(HTTPError): + self.api.push_task(swarming_pb2.NewTaskRequest()) + + def test_count_tasks_auth_error(self): + """Tests that count_tasks raises HTTPError on auth failure.""" + from requests.exceptions import HTTPError + self.mock.get_scoped_service_account_credentials.return_value = None + self.mock.post_url.side_effect = HTTPError( + "Unauthorized", response=mock.Mock(status_code=401)) + + with self.assertRaises(HTTPError): + self.api.count_tasks(swarming_pb2.TasksCountRequest()) + + def test_get_token_catches_default_credentials_error(self): + """Tests that _get_token catches DefaultCredentialsError and returns empty token.""" + from google.auth.exceptions import DefaultCredentialsError + self.mock.get_scoped_service_account_credentials.side_effect = DefaultCredentialsError( + "No creds") + + self.api.push_task(swarming_pb2.NewTaskRequest()) + + _, kwargs = self.mock.post_url.call_args + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer ') diff --git a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py index 234150cf5f..718b8277df 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py @@ -27,12 +27,14 @@ class SwarmingServiceTest(unittest.TestCase): def setUp(self): helpers.patch(self, [ 'clusterfuzz._internal.swarming.is_swarming_task', - 'clusterfuzz._internal.swarming.push_swarming_task', + 'clusterfuzz._internal.swarming.api.SwarmingApi.create', 'clusterfuzz._internal.swarming.create_new_task_request', 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module', 'clusterfuzz._internal.metrics.logs.error', 'clusterfuzz._internal.google_cloud_utils.compute_metadata.get', ]) + self.mock_api = mock.MagicMock() + self.mock.create.return_value = self.mock_api self.service = service.SwarmingService() self.mock.create_new_task_request.return_value = 'fake_request' self.mock.get.return_value = None @@ -48,7 +50,7 @@ def test_create_utask_main_job_success(self): # Success returns None in this interface (consistent with GcpBatchService) self.assertIsNone(result) - self.mock.push_swarming_task.assert_called_once_with('fake_request') + self.mock_api.push_task.assert_called_once_with('fake_request') def test_create_utask_main_job_failure(self): """Test creating a single task that is not a swarming task.""" @@ -61,7 +63,7 @@ def test_create_utask_main_job_failure(self): # Failure returns the task itself self.assertIsInstance(result, remote_task_types.RemoteTask) self.assertEqual(result.command, 'fuzz') - self.mock.push_swarming_task.assert_not_called() + self.mock_api.push_task.assert_not_called() def test_create_utask_main_jobs_mixed_results(self): """Test creating multiple tasks with mixed success/failure.""" @@ -79,8 +81,8 @@ def test_create_utask_main_jobs_mixed_results(self): self.assertEqual(len(unscheduled), 1) self.assertEqual(unscheduled[0].job_type, 'job2') - self.assertEqual(self.mock.push_swarming_task.call_count, 2) - self.mock.push_swarming_task.assert_has_calls([ + self.assertEqual(self.mock_api.push_task.call_count, 2) + self.mock_api.push_task.assert_has_calls([ mock.call('fake_request'), mock.call('fake_request'), ]) @@ -96,7 +98,7 @@ def test_create_utask_main_jobs_all_success(self): unscheduled = self.service.create_utask_main_jobs(tasks) self.assertEqual(unscheduled, []) - self.assertEqual(self.mock.push_swarming_task.call_count, 2) + self.assertEqual(self.mock_api.push_task.call_count, 2) def test_create_utask_main_jobs_all_fail(self): """Test creating multiple tasks where all fail.""" @@ -109,13 +111,13 @@ def test_create_utask_main_jobs_all_fail(self): unscheduled = self.service.create_utask_main_jobs(tasks) self.assertEqual(unscheduled, tasks) - self.mock.push_swarming_task.assert_not_called() + self.mock_api.push_task.assert_not_called() def test_create_utask_main_jobs_empty(self): """Test creating tasks with an empty list.""" unscheduled = self.service.create_utask_main_jobs([]) self.assertEqual(unscheduled, []) - self.mock.push_swarming_task.assert_not_called() + self.mock_api.push_task.assert_not_called() def test_create_utask_main_jobs_exception(self): """Test creating tasks when push_swarming_task raises an exception.""" @@ -124,11 +126,7 @@ def test_create_utask_main_jobs_exception(self): ] self.mock.is_swarming_task.return_value = True - self.mock.push_swarming_task.side_effect = Exception('error') + self.mock_api.push_task.side_effect = Exception('error') - unscheduled = self.service.create_utask_main_jobs(tasks) - - self.assertEqual(len(unscheduled), 1) - self.assertEqual(unscheduled[0].job_type, 'job1') - self.mock.error.assert_called_once_with( - 'Failed to push task to Swarming: fuzz, job1.') + with self.assertRaises(Exception): + self.service.create_utask_main_jobs(tasks) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index c48d154058..fb8b99f67c 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -16,8 +16,6 @@ import unittest from unittest import mock -from google.protobuf import json_format - from clusterfuzz._internal import swarming from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.protos import swarming_pb2 @@ -252,109 +250,6 @@ def test_get_spec_from_config_for_fuzz_task(self): ]) self.assertEqual(spec, expected_spec) - def test_push_swarming_task(self): - """Tests that push_swarming_task works as expected.""" - mock_creds = mock.MagicMock() - mock_creds.token = 'fake_token' - self.mock.get_scoped_service_account_credentials.return_value = mock_creds - - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - task_request = swarming.create_new_task_request('fuzz', job.name, - 'https://download_url') - swarming.push_swarming_task(task_request) - - expected_new_task_request = swarming_pb2.NewTaskRequest( - name='task_name', - priority=1, - realm='realm-name', - service_account='test-clusterfuzz-service-account-email', - task_slices=[ - swarming_pb2.TaskSlice( - expiration_secs=86400, - properties=swarming_pb2.TaskProperties( - command=[ - 'luci-auth', 'context', '--', './linux_entry_point.sh' - ], - dimensions=[ - swarming_pb2.StringPair( - key='os', value=str(job.platform).capitalize()), - swarming_pb2.StringPair(key='pool', value='pool-name') - ], - cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member - cas_input_root=swarming_pb2.CASReference( - cas_instance= - 'projects/server-name/instances/instance_name', - digest=swarming_pb2.Digest( - hash='linux_entry_point_archive_hash', - size_bytes=1234)), - execution_timeout_secs=12345, - env=[ - swarming_pb2.StringPair( - key='DOCKER_IMAGE', - value= - 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' - ), - swarming_pb2.StringPair(key='UWORKER', value='True'), - swarming_pb2.StringPair( - key='SWARMING_BOT', value='True'), - swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), - swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), - swarming_pb2.StringPair( - key='DISABLE_MOUNTS', value='True'), - swarming_pb2.StringPair( - key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), - swarming_pb2.StringPair( - key='DOCKER_ENV_VARS', - value= - ('{"DOCKER_IMAGE": "gcr.io/clusterfuzz-images/' - 'base:a2f4dd6-202202070654", "UWORKER": "True", ' - '"SWARMING_BOT": "True", "LOG_TO_GCP": "True", ' - '"IS_K8S_ENV": "True", "DISABLE_MOUNTS": "True", ' - '"LOGGING_CLOUD_PROJECT_ID": "project_id"}')), - ], - secret_bytes='https://download_url'.encode('utf-8'))) - ]) - - self.mock.get_scoped_service_account_credentials.assert_called_with( - swarming._SWARMING_SCOPES) # pylint: disable=protected-access - expected_headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': 'Bearer fake_token' - } - expected_url = 'https://server-name/prpc/swarming.v2.Tasks/NewTask' - self.mock.post_url.assert_called_with( - url=expected_url, - data=json_format.MessageToJson(expected_new_task_request), - headers=expected_headers) - - def test_push_swarming_task_with_refresh(self): - """Tests that push_swarming_task refreshes credentials if token is missing.""" - mock_creds = mock.MagicMock() - mock_creds.token = None - self.mock.get_scoped_service_account_credentials.return_value = mock_creds - - def refresh_side_effect(_): - mock_creds.token = 'refreshed_token' - - mock_creds.refresh.side_effect = refresh_side_effect - - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - request = swarming.create_new_task_request('fuzz', job.name, - 'https://download_url') - swarming.push_swarming_task(request) - - mock_creds.refresh.assert_called_with(self.mock.Request.return_value) - expected_headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': 'Bearer refreshed_token' - } - self.assertEqual(self.mock.post_url.call_args[1]['headers'], - expected_headers) - def test_is_swarming_task(self): """Tests that is_swarming_task works as expected.""" job = data_types.Job(