Skip to content

Commit 63fefef

Browse files
Add checkpoint for alpine and maven
1 parent 891ce72 commit 63fefef

11 files changed

Lines changed: 306 additions & 50 deletions

File tree

minecode_pipelines/pipelines/mine_alpine.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
2121
# Visit https://github.com/aboutcode-org/scancode.io for support and download.
2222

23+
from scanpipe.pipes import federatedcode
24+
25+
from minecode_pipelines import pipes
2326
from minecode_pipelines.pipelines import MineCodeBasePipeline
2427

2528
from minecode_pipelines.pipes import alpine
@@ -28,21 +31,61 @@
2831
class MineAlpine(MineCodeBasePipeline):
2932
"""Mine PackageURLs from alpine index and publish them to FederatedCode."""
3033

34+
pipeline_config_repo = "https://github.com/aboutcode-data/minecode-pipelines-config/"
35+
checkpoint_path = "alpine/checkpoints.json"
36+
3137
@classmethod
3238
def steps(cls):
3339
return (
3440
cls.check_federatedcode_eligibility,
3541
cls.create_federatedcode_working_dir,
3642
cls.fetch_federation_config,
43+
cls.fetch_checkpoint,
3744
cls.mine_and_publish_alpine_packageurls,
45+
cls.save_checkpoint,
3846
cls.delete_working_dir,
3947
)
4048

49+
def fetch_checkpoint(self):
50+
"""Fetch the list of already-processed Alpine index URLs."""
51+
self.checkpoint_config_repo = federatedcode.clone_repository(
52+
repo_url=self.pipeline_config_repo,
53+
clone_path=self.working_path / "minecode-pipelines-config",
54+
logger=self.log,
55+
)
56+
checkpoint = pipes.get_checkpoint_from_file(
57+
cloned_repo=self.checkpoint_config_repo,
58+
path=self.checkpoint_path,
59+
)
60+
self.processed_indexes = set(checkpoint.get("processed_indexes", []))
61+
self.log(
62+
f"Loaded checkpoint with {len(self.processed_indexes)} "
63+
f"already-processed indexes."
64+
)
65+
4166
def mine_and_publish_alpine_packageurls(self):
4267
alpine.mine_and_publish_alpine_packageurls(
4368
data_cluster=self.data_cluster,
4469
checked_out_repos=self.checked_out_repos,
4570
working_path=self.working_path,
4671
commit_msg_func=self.commit_message,
4772
logger=self.log,
73+
processed_indexes=self.processed_indexes,
74+
checkpoint_func=self._save_checkpoint,
75+
)
76+
77+
def _save_checkpoint(self):
78+
"""Save current set of processed indexes as a checkpoint."""
79+
checkpoint = {"processed_indexes": sorted(self.processed_indexes)}
80+
self.log(
81+
f"Saving checkpoint with {len(self.processed_indexes)} processed indexes."
82+
)
83+
pipes.update_checkpoints_in_github(
84+
checkpoint=checkpoint,
85+
cloned_repo=self.checkpoint_config_repo,
86+
path=self.checkpoint_path,
87+
logger=self.log,
4888
)
89+
90+
def save_checkpoint(self):
91+
self._save_checkpoint()

minecode_pipelines/pipelines/mine_maven.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,27 +65,54 @@ def fetch_checkpoint_and_maven_index(self):
6565
logger=self.log,
6666
)
6767

68+
# Determine if we can resume from the last processed purl
69+
saved_checksum = checkpoint.get("index_checksum")
70+
current_checksum = self.maven_nexus_collector.index_checksum
71+
self.last_processed_purl = None
72+
73+
if saved_checksum and saved_checksum == current_checksum:
74+
self.last_processed_purl = checkpoint.get("last_processed_purl")
75+
if self.last_processed_purl:
76+
self.log(
77+
f"Index checksum matches. Resuming from: {self.last_processed_purl}"
78+
)
79+
elif saved_checksum and saved_checksum != current_checksum:
80+
self.log(
81+
"Index checksum changed. Starting from beginning."
82+
)
83+
6884
def mine_and_publish_maven_packageurls(self):
6985
_mine_and_publish_packageurls(
70-
packageurls=self.maven_nexus_collector.get_packages(),
86+
packageurls=self.maven_nexus_collector.get_packages(
87+
last_processed_purl=self.last_processed_purl,
88+
),
7189
total_package_count=None,
7290
data_cluster=self.data_cluster,
7391
checked_out_repos=self.checked_out_repos,
7492
working_path=self.working_path,
7593
append_purls=self.append_purls,
7694
commit_msg_func=self.commit_message,
7795
logger=self.log,
96+
checkpoint_func=self._save_checkpoint,
7897
)
7998

80-
def save_check_point(self):
99+
def _save_checkpoint(self):
100+
"""Save current progress as a checkpoint."""
81101
last_incremental = self.maven_nexus_collector.index_properties.get(
82102
"nexus.index.last-incremental"
83103
)
84-
checkpoint = {"last_incremental": last_incremental}
104+
checkpoint = {
105+
"last_incremental": last_incremental,
106+
"index_checksum": self.maven_nexus_collector.index_checksum,
107+
"last_processed_purl": self.maven_nexus_collector.last_processed_purl,
108+
}
85109
self.log(f"Saving checkpoint: {checkpoint}")
86110
pipes.update_checkpoints_in_github(
87111
checkpoint=checkpoint,
88112
cloned_repo=self.checkpoint_config_repo,
89113
path=self.checkpoint_path,
90114
logger=self.log,
91115
)
116+
117+
def save_check_point(self):
118+
self._save_checkpoint()

minecode_pipelines/pipes/alpine.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,10 +551,29 @@ def mine_and_publish_alpine_packageurls(
551551
working_path,
552552
commit_msg_func,
553553
logger,
554+
processed_indexes=None,
555+
checkpoint_func=None,
554556
):
555557
"""Yield PackageURLs from Alpine index."""
556558

557-
index_count = len(ALPINE_LINUX_APKINDEX_URLS)
559+
if processed_indexes is None:
560+
processed_indexes = set()
561+
562+
indexes_to_process = [
563+
url for url in ALPINE_LINUX_APKINDEX_URLS
564+
if url not in processed_indexes
565+
]
566+
567+
index_count = len(indexes_to_process)
568+
total_count = len(ALPINE_LINUX_APKINDEX_URLS)
569+
skipped_count = total_count - index_count
570+
571+
if skipped_count:
572+
logger(
573+
f"Skipping {skipped_count:,d} already-processed indexes. "
574+
f"Processing {index_count:,d} remaining indexes."
575+
)
576+
558577
progress = LoopProgress(
559578
total_iterations=index_count,
560579
logger=logger,
@@ -563,7 +582,7 @@ def mine_and_publish_alpine_packageurls(
563582

564583
logger(f"Mine PackageURL from {index_count:,d} alpine index.")
565584
alpine_collector = AlpineCollector()
566-
for index in progress.iter(ALPINE_LINUX_APKINDEX_URLS):
585+
for index in progress.iter(indexes_to_process):
567586
logger(f"Mine PackageURL from {index} index.")
568587
_mine_and_publish_packageurls(
569588
packageurls=alpine_collector.get_package_from_index(index),
@@ -575,3 +594,6 @@ def mine_and_publish_alpine_packageurls(
575594
commit_msg_func=commit_msg_func,
576595
logger=logger,
577596
)
597+
processed_indexes.add(index)
598+
if checkpoint_func:
599+
checkpoint_func()

minecode_pipelines/pipes/maven.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#
99

1010
import gzip
11+
import hashlib
1112
import io
1213
import os
1314
from collections import namedtuple
@@ -574,6 +575,7 @@ def __init__(
574575
index_location=None,
575576
index_properties_location=None,
576577
last_incremental=None,
578+
last_processed_purl=None,
577579
logger=None,
578580
):
579581
if index_location and last_incremental:
@@ -584,6 +586,7 @@ def __init__(
584586
)
585587

586588
self.downloads = []
589+
self.last_processed_purl = last_processed_purl
587590

588591
if index_properties_location:
589592
self.index_properties_location = index_properties_location
@@ -615,6 +618,25 @@ def __init__(
615618
self.index_location = index_download.path
616619
self.index_increment_locations = []
617620

621+
self.index_checksum = self._compute_index_checksum()
622+
623+
def _compute_index_checksum(self):
624+
"""Compute SHA-256 checksum of the index file(s) for checkpoint validation."""
625+
locations = []
626+
if self.index_location:
627+
locations.append(self.index_location)
628+
locations.extend(self.index_increment_locations)
629+
630+
if not locations:
631+
return None
632+
633+
sha256 = hashlib.sha256()
634+
for location in sorted(locations):
635+
with open(location, "rb") as f:
636+
for chunk in iter(lambda: f.read(8192), b""):
637+
sha256.update(chunk)
638+
return sha256.hexdigest()
639+
618640
def __del__(self):
619641
if self.downloads:
620642
for download in self.downloads:
@@ -658,8 +680,9 @@ def _fetch_index_increments(self, last_incremental):
658680
index_increment_downloads.append(index_increment)
659681
return index_increment_downloads
660682

661-
def _get_packages(self, content=None):
683+
def _get_packages(self, content=None, last_processed_purl=None):
662684
artifacts = get_artifacts(content, worthyness=is_worthy_artifact)
685+
skipping = bool(last_processed_purl)
663686

664687
for artifact in artifacts:
665688
# we cannot do much without these
@@ -723,17 +746,27 @@ def _get_packages(self, content=None):
723746
name=artifact_id,
724747
version=version,
725748
)
749+
750+
if skipping:
751+
if str(current_purl) == last_processed_purl:
752+
skipping = False
753+
continue
754+
755+
self.last_processed_purl = str(current_purl)
726756
yield current_purl, [package.purl]
727757

728758
def _get_packages_from_index_increments(self):
729759
for index_increment in self.index_increment_locations:
730760
yield self._get_packages(content=index_increment)
731761

732-
def get_packages(self):
762+
def get_packages(self, last_processed_purl=None):
733763
"""Yield Package objects from maven index or index increments"""
734764
packages = []
735765
if self.index_increment_locations:
736766
packages = chain(self._get_packages_from_index_increments())
737767
elif self.index_location:
738-
packages = self._get_packages(content=self.index_location)
768+
packages = self._get_packages(
769+
content=self.index_location,
770+
last_processed_purl=last_processed_purl,
771+
)
739772
return packages

minecode_pipelines/tests/data/alpine/expected_packages.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
{
33
"type": "apk",
44
"namespace": "alpine",
5-
"name": "prspkt",
5+
"name": "2bwm",
66
"version": "0.3-r2",
77
"qualifiers": {
88
"arch": "x86_64",
@@ -75,12 +75,12 @@
7575
"repository_download_url": null,
7676
"api_data_url": null,
7777
"datasource_id": "alpine_metadata",
78-
"purl": "pkg:apk/alpine/prspkt@0.3-r2?arch=x86_64&distro=v3.22"
78+
"purl": "pkg:apk/alpine/2bwm@0.3-r2?arch=x86_64&distro=v3.22"
7979
},
8080
{
8181
"type": "apk",
8282
"namespace": "alpine",
83-
"name": "prspkt",
83+
"name": "2bwm-doc",
8484
"version": "0.3-r2",
8585
"qualifiers": {
8686
"arch": "x86_64",
@@ -153,6 +153,6 @@
153153
"repository_download_url": null,
154154
"api_data_url": null,
155155
"datasource_id": "alpine_metadata",
156-
"purl": "pkg:apk/alpine/prspkt@0.3-r2?arch=x86_64&distro=v3.22"
156+
"purl": "pkg:apk/alpine/2bwm-doc@0.3-r2?arch=x86_64&distro=v3.22"
157157
}
158158
]
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#Mon Oct 29 01:28:33 UTC 2018
2+
nexus.index.id=central
3+
nexus.index.chain-id=1318453614498
4+
nexus.index.timestamp=20181029012159.470 +0000
5+
nexus.index.last-incremental=445
6+
nexus.index.time=20120615133728.952 +0000

minecode_pipelines/tests/pipes/test_alpine.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#
99

1010
import os
11+
from unittest.mock import patch
1112

1213
from commoncode.testcase import check_against_expected_json_file
1314
from commoncode.testcase import FileBasedTesting
@@ -27,3 +28,51 @@ def test_parse_apkindex_and_build_package(self):
2728
packages.append(pd.to_dict())
2829
expected_loc = self.get_test_loc("alpine/expected_packages.json")
2930
check_against_expected_json_file(packages, expected_loc, regen=False)
31+
32+
def test_mine_and_publish_skips_processed_indexes(self):
33+
"""Test that already-processed index URLs are skipped."""
34+
all_urls = alpine.ALPINE_LINUX_APKINDEX_URLS
35+
processed_indexes = set(all_urls[1:])
36+
calls = []
37+
38+
def mock_mine_and_publish(**kwargs):
39+
calls.append(kwargs.get("packageurls"))
40+
41+
with patch(
42+
"minecode_pipelines.pipes.alpine._mine_and_publish_packageurls",
43+
side_effect=mock_mine_and_publish,
44+
):
45+
alpine.mine_and_publish_alpine_packageurls(
46+
data_cluster=None,
47+
checked_out_repos={},
48+
working_path=None,
49+
commit_msg_func=lambda *a, **kw: "test",
50+
logger=lambda msg: None,
51+
processed_indexes=processed_indexes,
52+
)
53+
self.assertEqual(len(calls), 1)
54+
55+
def test_mine_and_publish_updates_processed_indexes(self):
56+
"""Test that processed_indexes is updated after each index."""
57+
processed_indexes = set()
58+
59+
with patch(
60+
"minecode_pipelines.pipes.alpine._mine_and_publish_packageurls",
61+
):
62+
original_urls = alpine.ALPINE_LINUX_APKINDEX_URLS
63+
alpine.ALPINE_LINUX_APKINDEX_URLS = original_urls[:2]
64+
try:
65+
alpine.mine_and_publish_alpine_packageurls(
66+
data_cluster=None,
67+
checked_out_repos={},
68+
working_path=None,
69+
commit_msg_func=lambda *a, **kw: "test",
70+
logger=lambda msg: None,
71+
processed_indexes=processed_indexes,
72+
)
73+
finally:
74+
alpine.ALPINE_LINUX_APKINDEX_URLS = original_urls
75+
76+
self.assertEqual(len(processed_indexes), 2)
77+
self.assertIn(original_urls[0], processed_indexes)
78+
self.assertIn(original_urls[1], processed_indexes)

minecode_pipelines/tests/pipes/test_conan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020

2121
class ConanPipelineTests(TestCase):
22-
def test_collect_packages_from_conan_calls_write(self, mock_write):
22+
def test_collect_packages_from_conan_calls_write(self):
2323
packages_file = DATA_DIR / "cairo" / "cairo-config.yml"
2424
expected_file = DATA_DIR / "expected-cairo-purls.yml"
2525

0 commit comments

Comments
 (0)