diff --git a/vulnerabilities/tests/test_utils.py b/vulnerabilities/tests/test_utils.py index c9ba98e79..0460715be 100644 --- a/vulnerabilities/tests/test_utils.py +++ b/vulnerabilities/tests/test_utils.py @@ -7,12 +7,27 @@ # See https://aboutcode.org for more information about nexB OSS projects. # +from datetime import datetime +from datetime import timedelta + +from django.test import TestCase from fetchcode.package_versions import PackageVersion from packageurl import PackageURL from univers.version_constraint import VersionConstraint from univers.version_range import GemVersionRange +from univers.version_range import VersionRange from univers.versions import RubygemsVersion +from vulnerabilities import utils +from vulnerabilities.importer import AdvisoryDataV2 +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import PackageCommitPatchData +from vulnerabilities.importer import PatchData +from vulnerabilities.importer import VulnerabilitySeverity +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.pipelines import insert_advisory_v2 +from vulnerabilities.references import XsaReferenceV2 +from vulnerabilities.references import ZbxReferenceV2 from vulnerabilities.utils import AffectedPackage from vulnerabilities.utils import get_item from vulnerabilities.utils import get_severity_range @@ -151,3 +166,88 @@ def test_resolve_version_range_without_ignorable_versions(): def test_get_severity_range(): assert get_severity_range({""}) is None assert get_severity_range({}) is None + + +class TestComputeContentIdV2(TestCase): + def setUp(self): + self.advisory1 = AdvisoryDataV2( + summary="Test advisory", + aliases=["CVE-2025-0001", "CVE-2024-0001"], + references=[ + XsaReferenceV2.from_number(248), + ZbxReferenceV2.from_id("ZBX-000"), + ], + severities=[ + VulnerabilitySeverity.from_dict( + { + "system": "cvssv4", + "value": "7.5", + "scoring_elements": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H", + } + ), + VulnerabilitySeverity.from_dict( + { + "system": "cvssv3", + "value": "6.5", + "scoring_elements": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H", + } + ), + ], + weaknesses=[296, 233], + affected_packages=[ + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=VersionRange.from_string("vers:npm/<=1.2.3"), + fixed_version_range=VersionRange.from_string("vers:npm/1.2.4"), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=VersionRange.from_string("vers:npm/<=0.2.3"), + fixed_version_range=VersionRange.from_string("vers:npm/0.2.4"), + introduced_by_commit_patches=[ + PackageCommitPatchData( + vcs_url="https://foobar.vcs/", + commit_hash="662f801f", + ), + PackageCommitPatchData( + vcs_url="https://foobar.vcs/", + commit_hash="001f801f", + ), + ], + fixed_by_commit_patches=[ + PackageCommitPatchData( + vcs_url="https://foobar.vcs/", + commit_hash="982f801f", + ), + PackageCommitPatchData( + vcs_url="https://foobar.vcs/", + commit_hash="081f801f", + ), + ], + ), + ], + patches=[ + PatchData(patch_url="https://foo.bar/", patch_text="test patch"), + PatchData(patch_url="https://yet-another-foo.bar/", patch_text="some test patch"), + ], + advisory_id="ADV-001", + date_published=datetime.now() - timedelta(days=10), + url="https://example.com/advisory/1", + ) + insert_advisory_v2( + advisory=self.advisory1, + pipeline_id="test_pipeline_v2", + ) + + def test_compute_content_id_v2(self): + result = utils.compute_content_id_v2(self.advisory1) + self.assertEqual(result, "5211f1e6c3d935759fb288d79a865eeacc06e3e0e352ab7f5b4cb0e76a43a955") + + def test_content_id_from_adv_data_and_adv_model_are_same(self): + id_from_data = utils.compute_content_id_v2(self.advisory1) + advisory_model = AdvisoryV2.objects.first() + id_from_model = utils.compute_content_id_v2(advisory_model) + + self.assertEqual(id_from_data, id_from_model) diff --git a/vulnerabilities/utils.py b/vulnerabilities/utils.py index 82f29bcea..0eb1d1258 100644 --- a/vulnerabilities/utils.py +++ b/vulnerabilities/utils.py @@ -666,53 +666,32 @@ def compute_content_id_v2(advisory_data): """ Compute a unique content_id for an advisory by normalizing its data and hashing it. - :param advisory_data: An AdvisoryData object + :param advisory_data: An AdvisoryDataV2 or AdvisoryV2 object :return: SHA-256 hash digest as content_id """ - - # Normalize fields from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.models import AdvisoryV2 - if isinstance(advisory_data, AdvisoryV2): - normalized_data = { - "aliases": normalize_list(advisory_data.aliases), - "summary": normalize_text(advisory_data.summary), - "impacted_packages": sorted( - [impact.to_dict() for impact in advisory_data.impacted_packages.all()], - key=lambda x: json.dumps(x, sort_keys=True), - ), - "patches": sorted( - [patch.to_patch_data().to_dict() for patch in advisory_data.patches.all()], - key=lambda x: json.dumps(x, sort_keys=True), - ), - "references": [ref for ref in normalize_list(advisory_data.references) if ref], - "weaknesses": normalize_list(advisory_data.weaknesses), - } - normalized_data["url"] = advisory_data.url - - elif isinstance(advisory_data, AdvisoryDataV2): - normalized_data = { - "advisory_id": normalize_text(advisory_data.advisory_id), - "aliases": normalize_list(advisory_data.aliases), - "summary": normalize_text(advisory_data.summary), - "affected_packages": [ - pkg.to_dict() for pkg in normalize_list(advisory_data.affected_packages) if pkg - ], - "references": [ - ref.to_dict() for ref in normalize_list(advisory_data.references) if ref - ], - "severities": [ - sev.to_dict() for sev in normalize_list(advisory_data.severities) if sev - ], - "weaknesses": normalize_list(advisory_data.weaknesses), - "patches": [patch.to_dict() for patch in normalize_list(advisory_data.patches)], - } - normalized_data["url"] = advisory_data.url - - else: + if not isinstance(advisory_data, (AdvisoryV2, AdvisoryDataV2)): raise ValueError("Unsupported advisory data type for content ID computation") + if isinstance(advisory_data, AdvisoryV2): + advisory_data = advisory_data.to_advisory_data() + + normalized_data = { + "advisory_id": normalize_text(advisory_data.advisory_id), + "aliases": normalize_list(advisory_data.aliases), + "summary": normalize_text(advisory_data.summary), + "affected_packages": [ + pkg.to_dict() for pkg in normalize_list(advisory_data.affected_packages) if pkg + ], + "references": [ref.to_dict() for ref in normalize_list(advisory_data.references) if ref], + "severities": [sev.to_dict() for sev in normalize_list(advisory_data.severities) if sev], + "weaknesses": normalize_list(advisory_data.weaknesses), + "patches": [patch.to_dict() for patch in normalize_list(advisory_data.patches)], + } + normalized_data["url"] = advisory_data.url + normalized_json = json.dumps(normalized_data, separators=(",", ":"), sort_keys=True) content_id = hashlib.sha256(normalized_json.encode("utf-8")).hexdigest()