Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions vulnerabilities/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,27 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#

from datetime import datetime
from datetime import timedelta

from django.test import TestCase
from fetchcode.package_versions import PackageVersion
from packageurl import PackageURL
from univers.version_constraint import VersionConstraint
from univers.version_range import GemVersionRange
from univers.version_range import VersionRange
from univers.versions import RubygemsVersion

from vulnerabilities import utils
from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import PackageCommitPatchData
from vulnerabilities.importer import PatchData
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.models import AdvisoryV2
from vulnerabilities.pipelines import insert_advisory_v2
from vulnerabilities.references import XsaReferenceV2
from vulnerabilities.references import ZbxReferenceV2
from vulnerabilities.utils import AffectedPackage
from vulnerabilities.utils import get_item
from vulnerabilities.utils import get_severity_range
Expand Down Expand Up @@ -151,3 +166,88 @@ def test_resolve_version_range_without_ignorable_versions():
def test_get_severity_range():
assert get_severity_range({""}) is None
assert get_severity_range({}) is None


class TestComputeContentIdV2(TestCase):
def setUp(self):
self.advisory1 = AdvisoryDataV2(
summary="Test advisory",
aliases=["CVE-2025-0001", "CVE-2024-0001"],
references=[
XsaReferenceV2.from_number(248),
ZbxReferenceV2.from_id("ZBX-000"),
],
severities=[
VulnerabilitySeverity.from_dict(
{
"system": "cvssv4",
"value": "7.5",
"scoring_elements": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
}
),
VulnerabilitySeverity.from_dict(
{
"system": "cvssv3",
"value": "6.5",
"scoring_elements": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
}
),
],
weaknesses=[296, 233],
affected_packages=[
AffectedPackageV2(
package=PackageURL.from_string("pkg:npm/foobar"),
affected_version_range=VersionRange.from_string("vers:npm/<=1.2.3"),
fixed_version_range=VersionRange.from_string("vers:npm/1.2.4"),
introduced_by_commit_patches=[],
fixed_by_commit_patches=[],
),
AffectedPackageV2(
package=PackageURL.from_string("pkg:npm/foobar"),
affected_version_range=VersionRange.from_string("vers:npm/<=0.2.3"),
fixed_version_range=VersionRange.from_string("vers:npm/0.2.4"),
introduced_by_commit_patches=[
PackageCommitPatchData(
vcs_url="https://foobar.vcs/",
commit_hash="662f801f",
),
PackageCommitPatchData(
vcs_url="https://foobar.vcs/",
commit_hash="001f801f",
),
],
fixed_by_commit_patches=[
PackageCommitPatchData(
vcs_url="https://foobar.vcs/",
commit_hash="982f801f",
),
PackageCommitPatchData(
vcs_url="https://foobar.vcs/",
commit_hash="081f801f",
),
],
),
],
patches=[
PatchData(patch_url="https://foo.bar/", patch_text="test patch"),
PatchData(patch_url="https://yet-another-foo.bar/", patch_text="some test patch"),
],
advisory_id="ADV-001",
date_published=datetime.now() - timedelta(days=10),
url="https://example.com/advisory/1",
)
insert_advisory_v2(
advisory=self.advisory1,
pipeline_id="test_pipeline_v2",
)

def test_compute_content_id_v2(self):
result = utils.compute_content_id_v2(self.advisory1)
self.assertEqual(result, "5211f1e6c3d935759fb288d79a865eeacc06e3e0e352ab7f5b4cb0e76a43a955")

def test_content_id_from_adv_data_and_adv_model_are_same(self):
id_from_data = utils.compute_content_id_v2(self.advisory1)
advisory_model = AdvisoryV2.objects.first()
id_from_model = utils.compute_content_id_v2(advisory_model)

self.assertEqual(id_from_data, id_from_model)
59 changes: 19 additions & 40 deletions vulnerabilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,53 +666,32 @@ def compute_content_id_v2(advisory_data):
"""
Compute a unique content_id for an advisory by normalizing its data and hashing it.

:param advisory_data: An AdvisoryData object
:param advisory_data: An AdvisoryDataV2 or AdvisoryV2 object
:return: SHA-256 hash digest as content_id
"""

# Normalize fields
from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.models import AdvisoryV2

if isinstance(advisory_data, AdvisoryV2):
normalized_data = {
"aliases": normalize_list(advisory_data.aliases),
"summary": normalize_text(advisory_data.summary),
"impacted_packages": sorted(
[impact.to_dict() for impact in advisory_data.impacted_packages.all()],
key=lambda x: json.dumps(x, sort_keys=True),
),
"patches": sorted(
[patch.to_patch_data().to_dict() for patch in advisory_data.patches.all()],
key=lambda x: json.dumps(x, sort_keys=True),
),
"references": [ref for ref in normalize_list(advisory_data.references) if ref],
"weaknesses": normalize_list(advisory_data.weaknesses),
}
normalized_data["url"] = advisory_data.url

elif isinstance(advisory_data, AdvisoryDataV2):
normalized_data = {
"advisory_id": normalize_text(advisory_data.advisory_id),
"aliases": normalize_list(advisory_data.aliases),
"summary": normalize_text(advisory_data.summary),
"affected_packages": [
pkg.to_dict() for pkg in normalize_list(advisory_data.affected_packages) if pkg
],
"references": [
ref.to_dict() for ref in normalize_list(advisory_data.references) if ref
],
"severities": [
sev.to_dict() for sev in normalize_list(advisory_data.severities) if sev
],
"weaknesses": normalize_list(advisory_data.weaknesses),
"patches": [patch.to_dict() for patch in normalize_list(advisory_data.patches)],
}
normalized_data["url"] = advisory_data.url

else:
if not isinstance(advisory_data, (AdvisoryV2, AdvisoryDataV2)):
raise ValueError("Unsupported advisory data type for content ID computation")

if isinstance(advisory_data, AdvisoryV2):
advisory_data = advisory_data.to_advisory_data()

normalized_data = {
"advisory_id": normalize_text(advisory_data.advisory_id),
"aliases": normalize_list(advisory_data.aliases),
"summary": normalize_text(advisory_data.summary),
"affected_packages": [
pkg.to_dict() for pkg in normalize_list(advisory_data.affected_packages) if pkg
],
"references": [ref.to_dict() for ref in normalize_list(advisory_data.references) if ref],
"severities": [sev.to_dict() for sev in normalize_list(advisory_data.severities) if sev],
"weaknesses": normalize_list(advisory_data.weaknesses),
"patches": [patch.to_dict() for patch in normalize_list(advisory_data.patches)],
}
normalized_data["url"] = advisory_data.url

normalized_json = json.dumps(normalized_data, separators=(",", ":"), sort_keys=True)
content_id = hashlib.sha256(normalized_json.encode("utf-8")).hexdigest()

Expand Down