|
1 | 1 | """ |
2 | | -A simple library for generating the short-form vendor security review report. |
| 2 | +A library for generating Security Review Reports in both JSON and CoRIM formats. |
3 | 3 |
|
4 | 4 | This script is intended to be used by Security Review Providers who are |
5 | 5 | participating in the Open Compute Project's Firmware Security Review Framework. |
6 | 6 | The script complies with version 0.3 (draft) of the Security Review Framework |
7 | | -document. |
| 7 | +document and supports the new CoRIM (CBOR) format. |
8 | 8 |
|
9 | 9 | More details about the OCP review framework can be found here: |
10 | 10 | * https://www.opencompute.org/wiki/Security |
11 | 11 |
|
12 | 12 | For example usage of this script, refer to the following: |
13 | | - * example_generate.py: |
14 | | - Demonstrates how to generate, sign and verify the JSON report. |
| 13 | + * example_gen_sign_verify.py: |
| 14 | + Demonstrates how to generate, sign and verify reports in both formats. |
15 | 15 | * sample_report.json |
16 | 16 | An example JSON report that was created by this script. |
17 | 17 |
|
18 | | -Author: Jeremy Boone, NCC Group |
19 | | -Date : June 5th, 2023 |
| 18 | +Author: Jeremy Boone, NCC Group (original), Extended for CoRIM support |
| 19 | +Date : June 5th, 2023 (original), January 2025 (CoRIM extension) |
20 | 20 | """ |
21 | 21 |
|
22 | 22 | import time |
23 | 23 | import json |
24 | 24 | import jwt |
25 | 25 | import base64 |
26 | 26 | import hashlib |
| 27 | +import cbor2 |
| 28 | +from datetime import datetime |
| 29 | +from typing import Dict, List, Optional, Union, Any |
| 30 | + |
27 | 31 | from cryptography.hazmat.primitives import serialization |
28 | 32 | from cryptography.hazmat.backends import default_backend |
29 | 33 | from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey |
|
33 | 37 | SECP521R1, |
34 | 38 | ) |
35 | 39 |
|
| 40 | +try: |
| 41 | + import cwt |
| 42 | + from cwt import COSEKey |
| 43 | + COSE_AVAILABLE = True |
| 44 | +except ImportError: |
| 45 | + COSE_AVAILABLE = False |
| 46 | + # CoRIM signing will not work without cwt, but JSON functionality remains |
| 47 | + |
36 | 48 | from azure.identity import DefaultAzureCredential |
37 | 49 | from azure.keyvault.keys import KeyClient |
38 | 50 | from azure.keyvault.keys.crypto import CryptographyClient, SignatureAlgorithm |
|
56 | 68 | 4096, # RSA 512 |
57 | 69 | ) |
58 | 70 |
|
| 71 | +# CoRIM specific constants |
| 72 | +DEVICE_CATEGORIES = { |
| 73 | + "storage": 0, |
| 74 | + "network": 1, |
| 75 | + "gpu": 2, |
| 76 | + "cpu": 3, |
| 77 | + "apu": 4, |
| 78 | + "bmc": 5 |
| 79 | +} |
| 80 | + |
| 81 | +# CBOR tags for CoRIM |
| 82 | +CORIM_TAG = 501 |
| 83 | +COMID_TAG = 506 |
| 84 | + |
59 | 85 |
|
60 | 86 | class ShortFormReport(object): |
61 | 87 | def __init__(self, framework_ver: str = "1.1"): |
62 | 88 | self.report = {} |
63 | 89 | self.report["review_framework_version"] = f"{framework_ver}".strip() |
64 | 90 | self.signed_report = None |
| 91 | + self.signed_corim = None |
65 | 92 |
|
66 | 93 | def add_device( |
67 | 94 | self, |
@@ -262,6 +289,281 @@ def sign_report(self, priv_key: bytes, algo: str, kid: str) -> bool: |
262 | 289 | ) |
263 | 290 | return True |
264 | 291 |
|
| 292 | + ########################################################################### |
| 293 | + # CoRIM format methods (new functionality) |
| 294 | + ########################################################################### |
| 295 | + |
| 296 | + def _convert_to_corim_structure(self) -> Dict[str, Any]: |
| 297 | + """Convert internal JSON structure to CoRIM structure.""" |
| 298 | + if "audit" not in self.report or "device" not in self.report: |
| 299 | + raise ValueError("Report must have both device and audit information") |
| 300 | + |
| 301 | + # Parse completion date to Unix timestamp with CBOR tag 1 |
| 302 | + date_str = self.report["audit"]["completion_date"] |
| 303 | + try: |
| 304 | + dt = datetime.strptime(date_str, "%Y-%m-%d") |
| 305 | + completion_timestamp = cbor2.CBORTag(1, int(dt.timestamp())) |
| 306 | + except ValueError: |
| 307 | + raise ValueError(f"Invalid date format: {date_str}. Expected YYYY-MM-DD") |
| 308 | + |
| 309 | + # Build fw-identifier structure |
| 310 | + fw_identifiers = [] |
| 311 | + fw_id = {} |
| 312 | + |
| 313 | + # Add version information |
| 314 | + if self.report["device"]["fw_version"]: |
| 315 | + fw_id[0] = { # fw-version |
| 316 | + 0: self.report["device"]["fw_version"], # version |
| 317 | + 1: "semver" # version-scheme (default) |
| 318 | + } |
| 319 | + |
| 320 | + # Add digests |
| 321 | + digests = [] |
| 322 | + if self.report["device"]["fw_hash_sha2_384"]: |
| 323 | + digests.append([-43, bytes.fromhex(self.report["device"]["fw_hash_sha2_384"])]) # SHA-384 |
| 324 | + if self.report["device"]["fw_hash_sha2_512"]: |
| 325 | + digests.append([-44, bytes.fromhex(self.report["device"]["fw_hash_sha2_512"])]) # SHA-512 |
| 326 | + |
| 327 | + if digests: |
| 328 | + fw_id[1] = digests # fw-file-digests |
| 329 | + |
| 330 | + # Add repo tag |
| 331 | + if self.report["device"]["repo_tag"]: |
| 332 | + fw_id[2] = self.report["device"]["repo_tag"] # repo-tag |
| 333 | + |
| 334 | + # Add manifest if present |
| 335 | + if "manifest" in self.report["device"]: |
| 336 | + manifest_entries = [] |
| 337 | + for entry in self.report["device"]["manifest"]: |
| 338 | + manifest_entries.append({ |
| 339 | + 0: entry["file_name"], # filename |
| 340 | + 1: [[-44, bytes.fromhex(entry["file_hash"])]] # file-hash (assuming SHA-512) |
| 341 | + }) |
| 342 | + |
| 343 | + # Calculate manifest digest |
| 344 | + manifest_str = json.dumps(self.report["device"]["manifest"], |
| 345 | + sort_keys=False, separators=(',', ':')).encode('utf-8') |
| 346 | + manifest_digest = hashlib.sha512(manifest_str).digest() |
| 347 | + |
| 348 | + fw_id[3] = { # src-manifest |
| 349 | + 0: [[-44, manifest_digest]], # manifest-digest |
| 350 | + 1: manifest_entries # manifest |
| 351 | + } |
| 352 | + |
| 353 | + fw_identifiers.append(fw_id) |
| 354 | + |
| 355 | + # Convert device category to integer |
| 356 | + category_str = self.report["device"]["category"].lower() |
| 357 | + device_category = None |
| 358 | + for cat, val in DEVICE_CATEGORIES.items(): |
| 359 | + if cat in category_str: |
| 360 | + device_category = val |
| 361 | + break |
| 362 | + |
| 363 | + # Convert issues |
| 364 | + corim_issues = [] |
| 365 | + for issue in self.report["audit"]["issues"]: |
| 366 | + corim_issue = { |
| 367 | + 0: issue["title"], # title |
| 368 | + 1: issue["cvss_score"], # cvss-score |
| 369 | + 2: issue["cvss_vector"], # cvss-vector |
| 370 | + 3: issue["cwe"], # cwe |
| 371 | + 4: issue["description"], # description |
| 372 | + } |
| 373 | + |
| 374 | + # Add optional fields |
| 375 | + if "cvss_version" in self.report["audit"]: |
| 376 | + corim_issue[5] = self.report["audit"]["cvss_version"] # cvss-version |
| 377 | + |
| 378 | + if issue.get("cve"): |
| 379 | + corim_issue[6] = issue["cve"] # cve |
| 380 | + |
| 381 | + corim_issues.append(corim_issue) |
| 382 | + |
| 383 | + # Build the ocp-safe-sfr-map |
| 384 | + sfr_map = { |
| 385 | + 0: self.report["review_framework_version"], # review-framework-version |
| 386 | + 1: self.report["audit"]["report_version"], # report-version |
| 387 | + 2: completion_timestamp, # completion-date |
| 388 | + 3: self.report["audit"]["scope_number"], # scope-number |
| 389 | + 4: fw_identifiers, # fw-identifiers |
| 390 | + } |
| 391 | + |
| 392 | + if device_category is not None: |
| 393 | + sfr_map[5] = device_category # device-category |
| 394 | + |
| 395 | + if corim_issues: |
| 396 | + sfr_map[6] = corim_issues # issues |
| 397 | + |
| 398 | + return sfr_map |
| 399 | + |
| 400 | + def _build_corim_structure(self, sfr_map: Dict[str, Any]) -> Dict[str, Any]: |
| 401 | + """Build the complete CoRIM structure with embedded SFR data.""" |
| 402 | + |
| 403 | + # Create the measurement-values-map with SFR extension |
| 404 | + measurement_values = { |
| 405 | + 1029: sfr_map # ocp-safe-sfr extension |
| 406 | + } |
| 407 | + |
| 408 | + # Create measurement-map for endorsement |
| 409 | + endorsement_measurement_map = { |
| 410 | + 1: measurement_values # mval |
| 411 | + } |
| 412 | + |
| 413 | + # Create measurement-map for conditions (with digests) |
| 414 | + condition_measurement_map = { |
| 415 | + 1: { # mval |
| 416 | + 2: self._get_fw_digests() # digests |
| 417 | + } |
| 418 | + } |
| 419 | + |
| 420 | + # Create endorsed-triple-record |
| 421 | + endorsed_triple = [ |
| 422 | + # environment-map |
| 423 | + { |
| 424 | + 0: { # class |
| 425 | + 1: self.report["device"]["vendor"], # vendor |
| 426 | + 2: self.report["device"]["product"] # model |
| 427 | + } |
| 428 | + }, |
| 429 | + # endorsement (array of measurement-map) |
| 430 | + [endorsement_measurement_map] |
| 431 | + ] |
| 432 | + |
| 433 | + # Create stateful-environment-record for conditions |
| 434 | + stateful_environment = [ |
| 435 | + # environment-map |
| 436 | + { |
| 437 | + 0: { # class |
| 438 | + 1: self.report["device"]["vendor"], # vendor |
| 439 | + 2: self.report["device"]["product"] # model |
| 440 | + } |
| 441 | + }, |
| 442 | + # claims-list (measurement-map array) |
| 443 | + [condition_measurement_map] |
| 444 | + ] |
| 445 | + |
| 446 | + # Create conditional-endorsement-triple-record |
| 447 | + conditional_endorsement = [ |
| 448 | + # conditions (stateful-environment-record array) |
| 449 | + [stateful_environment], |
| 450 | + # endorsements (endorsed-triple-record array) |
| 451 | + [endorsed_triple] |
| 452 | + ] |
| 453 | + |
| 454 | + # Create concise-mid-tag |
| 455 | + comid = { |
| 456 | + 1: { # tag-identity |
| 457 | + 0: f"{self.report['device']['vendor'].lower().replace(' ', '-')}-review-comid-001" # tag-id |
| 458 | + }, |
| 459 | + 4: { # triples |
| 460 | + 10: [conditional_endorsement] # conditional-endorsement-triples |
| 461 | + } |
| 462 | + } |
| 463 | + |
| 464 | + # Create the main CoRIM structure |
| 465 | + corim = { |
| 466 | + 0: f"sfr-corim-{int(time.time())}", # id |
| 467 | + 1: [cbor2.CBORTag(COMID_TAG, cbor2.dumps(comid))], # tags |
| 468 | + 5: [ # entities |
| 469 | + { |
| 470 | + 0: self.report["audit"]["srp"], # entity-name |
| 471 | + 2: [1] # role: manifest-creator |
| 472 | + } |
| 473 | + ] |
| 474 | + } |
| 475 | + |
| 476 | + return corim |
| 477 | + |
| 478 | + def _get_fw_digests(self) -> List[List]: |
| 479 | + """Get firmware digests in CoRIM format.""" |
| 480 | + digests = [] |
| 481 | + if self.report["device"]["fw_hash_sha2_384"]: |
| 482 | + digests.append([-43, bytes.fromhex(self.report["device"]["fw_hash_sha2_384"])]) |
| 483 | + if self.report["device"]["fw_hash_sha2_512"]: |
| 484 | + digests.append([-44, bytes.fromhex(self.report["device"]["fw_hash_sha2_512"])]) |
| 485 | + return digests |
| 486 | + |
| 487 | + def get_report_as_corim_dict(self) -> Dict[str, Any]: |
| 488 | + """Returns the report as a CoRIM-structured dictionary.""" |
| 489 | + sfr_map = self._convert_to_corim_structure() |
| 490 | + return self._build_corim_structure(sfr_map) |
| 491 | + |
| 492 | + def get_report_as_corim_cbor(self) -> bytes: |
| 493 | + """Returns the report as CBOR-encoded CoRIM bytes.""" |
| 494 | + corim_dict = self.get_report_as_corim_dict() |
| 495 | + tagged_corim = cbor2.CBORTag(CORIM_TAG, corim_dict) |
| 496 | + return cbor2.dumps(tagged_corim) |
| 497 | + |
| 498 | + def get_report_as_corim_diag(self) -> str: |
| 499 | + """Returns the report as human-readable CBOR diagnostic notation.""" |
| 500 | + corim_cbor = self.get_report_as_corim_cbor() |
| 501 | + # This is a simplified diagnostic representation |
| 502 | + # In practice, you might want to use a proper CBOR diagnostic tool |
| 503 | + return f"CBOR data ({len(corim_cbor)} bytes): {corim_cbor.hex()}" |
| 504 | + |
| 505 | + def sign_corim(self, priv_key: bytes, algo: str, kid: str) -> bool: |
| 506 | + """Sign the CoRIM report using COSE-Sign1 with the cwt library. |
| 507 | + |
| 508 | + Uses the cwt (CBOR Web Token) library for better COSE compatibility. |
| 509 | + """ |
| 510 | + if not COSE_AVAILABLE: |
| 511 | + print("cwt library not available. Cannot sign CoRIM.") |
| 512 | + return False |
| 513 | + |
| 514 | + try: |
| 515 | + # Load private key using cryptography |
| 516 | + pem = serialization.load_pem_private_key( |
| 517 | + priv_key, None, backend=default_backend() |
| 518 | + ) |
| 519 | + |
| 520 | + # Map algorithm to COSE algorithm identifier |
| 521 | + cose_alg = None |
| 522 | + if algo == "ES512" and isinstance(pem, EllipticCurvePrivateKey) and pem.curve.name == "secp521r1": |
| 523 | + cose_alg = -36 # ES512 |
| 524 | + elif algo == "ES384" and isinstance(pem, EllipticCurvePrivateKey) and pem.curve.name == "secp384r1": |
| 525 | + cose_alg = -35 # ES384 |
| 526 | + elif algo == "PS512" and isinstance(pem, RSAPrivateKey): |
| 527 | + cose_alg = -38 # PS512 |
| 528 | + elif algo == "PS384" and isinstance(pem, RSAPrivateKey): |
| 529 | + cose_alg = -37 # PS384 |
| 530 | + else: |
| 531 | + print(f"Unsupported algorithm/key combination: {algo} with {type(pem)}") |
| 532 | + return False |
| 533 | + |
| 534 | + # Create Signer using cwt library |
| 535 | + signer = cwt.Signer.from_pem(priv_key, alg=cose_alg, kid=kid) |
| 536 | + |
| 537 | + # Get CoRIM payload as claims (cwt expects claims, not raw payload) |
| 538 | + corim_cbor = self.get_report_as_corim_cbor() |
| 539 | + |
| 540 | + # For COSE signing, we need to create claims structure |
| 541 | + # The CoRIM data becomes the payload claim |
| 542 | + claims = { |
| 543 | + # Use a custom claim number for CoRIM data |
| 544 | + -65537: corim_cbor # Custom claim for CoRIM payload |
| 545 | + } |
| 546 | + |
| 547 | + # Sign using cwt library with the signer |
| 548 | + signed_corim = cwt.encode_and_sign( |
| 549 | + claims=claims, |
| 550 | + signers=[signer], |
| 551 | + tagged=True # Use CBOR tag for COSE_Sign1 |
| 552 | + ) |
| 553 | + |
| 554 | + self.signed_corim = signed_corim |
| 555 | + print("CoRIM successfully signed with COSE-Sign1 using cwt library") |
| 556 | + return True |
| 557 | + |
| 558 | + except Exception as e: |
| 559 | + print(f"Error signing CoRIM with cwt: {e}") |
| 560 | + print("CoRIM generation is working, but signing failed.") |
| 561 | + return False |
| 562 | + |
| 563 | + def get_signed_corim(self) -> bytes: |
| 564 | + """Returns the signed CoRIM report (COSE-Sign1).""" |
| 565 | + return self.signed_corim |
| 566 | + |
265 | 567 | def get_signed_report(self) -> bytes: |
266 | 568 | """Returns the signed short form report (a JWS) as a bytes object. May |
267 | 569 | return a 'None' object if the report hasn't been signed yet. |
|
0 commit comments