From a94c27fd684214a384295b99a539d42dd9fa98e4 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Mon, 8 Dec 2025 11:53:12 +0100 Subject: [PATCH 1/2] feat: track reOnboarding counts and unreachable commits --- .../src/crowdgit/database/crud.py | 18 ++++++++++--- .../src/crowdgit/models/repository.py | 5 ++++ .../services/commit/commit_service.py | 26 ++++++++++++++++--- .../src/crowdgit/worker/repository_worker.py | 2 ++ 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/database/crud.py b/services/apps/git_integration/src/crowdgit/database/crud.py index 066ec1f917..c35712b5bb 100644 --- a/services/apps/git_integration/src/crowdgit/database/crud.py +++ b/services/apps/git_integration/src/crowdgit/database/crud.py @@ -32,7 +32,7 @@ async def insert_repository(url: str, priority: int = 0) -> str: async def get_repository_by_url(url: str) -> dict[str, Any] | None: """Get repository by URL""" sql_query = """ - SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "stuckRequiresReOnboard" + SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount" FROM git.repositories WHERE url = $1 AND "deletedAt" IS NULL """ @@ -49,7 +49,7 @@ async def get_recently_processed_repository_by_url(url: str) -> Repository | Non Used to check if a repository needs reprocessing based on the update interval. """ sql_query = """ - SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "segmentId", "stuckRequiresReOnboard" + SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "segmentId", "stuckRequiresReOnboard", "reOnboardingCount" FROM git.repositories WHERE url = $1 AND "deletedAt" IS NULL @@ -88,7 +88,7 @@ async def acquire_onboarding_repo() -> Repository | None: LIMIT 1 FOR UPDATE SKIP LOCKED ) - RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard" + RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount" """ return await acquire_repository( onboarding_repo_sql_query, @@ -138,7 +138,7 @@ async def acquire_recurrent_repo() -> Repository | None: LIMIT 1 FOR UPDATE SKIP LOCKED ) - RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard" + RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount" """ states_to_exclude = ( RepositoryState.PENDING, @@ -220,6 +220,16 @@ async def update_last_processed_commit(repo_id: str, commit_hash: str, branch: s return str(result) +async def increase_re_onboarding_count(repo_id: str): + sql_query = """ + UPDATE git.repositories + SET "reOnboardingCount" = "reOnboardingCount" + 1, + "updatedAt" = NOW() + WHERE id = $1 + """ + return await execute(sql_query, (repo_id,)) + + async def mark_repo_as_processed(repo_id: str, repo_state: RepositoryState): sql_query = """ UPDATE git.repositories diff --git a/services/apps/git_integration/src/crowdgit/models/repository.py b/services/apps/git_integration/src/crowdgit/models/repository.py index cb53e69bc7..ad99fda56f 100644 --- a/services/apps/git_integration/src/crowdgit/models/repository.py +++ b/services/apps/git_integration/src/crowdgit/models/repository.py @@ -45,6 +45,10 @@ class Repository(BaseModel): default=False, description="Indicates if the stuck repository is resolved by a re-onboarding", ) + re_onboarding_count: int = Field( + ..., + description="Tracks the number of times this repository has been re-onboarded. Used to identify unreachable commits via activity.attributes.cycle matching pattern onboarding-{reOnboardingCount}", + ) created_at: datetime = Field(..., description="Creation timestamp") updated_at: datetime = Field(..., description="Last update timestamp") @@ -72,6 +76,7 @@ def from_db(cls, db_data: dict[str, Any]) -> Repository: "lastMaintainerRunAt": "last_maintainer_run_at", "forkedFrom": "forked_from", "stuckRequiresReOnboard": "stuck_requires_re_onboard", + "reOnboardingCount": "re_onboarding_count", } for db_field, model_field in field_mapping.items(): if db_field in repo_data: diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 1cb71c5908..334deafdf0 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -354,6 +354,7 @@ def create_activity( member: dict, source_id: str, segment_id: str, + re_onboarding_count: int, source_parent_id: str = "", ) -> dict: """ @@ -366,6 +367,8 @@ def create_activity( member: Member information dictionary source_id: Source ID for the activity segment_id: Segment identifier + re_onboarding_count: Number of times the repository has been re-onboarded. + Used to set activity.attributes.cycle when > 0. source_parent_id: Parent source ID (optional) Returns: @@ -416,7 +419,7 @@ def create_activity( # Pre-calculate commit attributes to avoid repeated lookups insertions = commit.get("insertions", 0) deletions = commit.get("deletions", 0) - return { + activity = { "type": activity_type, "timestamp": timestamp, "sourceId": source_id, @@ -436,6 +439,9 @@ def create_activity( "member": processed_member, "segmentId": segment_id, } + if re_onboarding_count > 0: + activity["attributes"]["cycle"] = f"onboarding-{re_onboarding_count}" + return activity def extract_activities(self, commit_message: list[str]) -> list[dict[str, dict[str, str]]]: """ @@ -500,7 +506,12 @@ def prepare_activity_for_db_and_queue( return activity_db, activity_kafka def create_activities_from_commit( - self, remote: str, commit: dict[str, Any], segment_id: str, integration_id: str + self, + remote: str, + commit: dict[str, Any], + segment_id: str, + integration_id: str, + re_onboarding_count: int, ) -> tuple[list[tuple], list[dict[str, Any]]]: """ Create activities from a commit with improved efficiency. @@ -510,6 +521,8 @@ def create_activities_from_commit( commit: The commit dictionary containing commit data segment_id: Segment identifier integration_id: Integration identifier + re_onboarding_count: Number of times the repository has been re-onboarded. + Used to set activity.attributes.cycle when > 0. Returns: Tuple of (activities_db, activities_queue) lists @@ -537,6 +550,7 @@ def create_activities_from_commit( member=author, source_id=commit_hash, segment_id=segment_id, + re_onboarding_count=re_onboarding_count, ) activity_db, activity_kafka = self.prepare_activity_for_db_and_queue( activity, segment_id, integration_id @@ -565,6 +579,7 @@ def create_activities_from_commit( source_id=committer_source_id, source_parent_id=commit_hash, segment_id=segment_id, + re_onboarding_count=re_onboarding_count, ) activity_db, activity_kafka = self.prepare_activity_for_db_and_queue( activity, segment_id, integration_id @@ -599,6 +614,7 @@ def create_activities_from_commit( source_id=source_id, source_parent_id=commit_hash, segment_id=segment_id, + re_onboarding_count=re_onboarding_count, ) activity_db, activity_kafka = self.prepare_activity_for_db_and_queue( activity, segment_id, integration_id @@ -704,7 +720,11 @@ async def process_commits_chunk( commit = self._construct_commit_dict(commit_lines, numstats_text) if self._validate_commit_data(commit): activity_db_records, activity_kafka = self.create_activities_from_commit( - batch_info.remote, commit, repository.segment_id, repository.integration_id + batch_info.remote, + commit, + repository.segment_id, + repository.integration_id, + repository.re_onboarding_count, ) activities_db.extend(activity_db_records) activities_queue.extend(activity_kafka) diff --git a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py index 5133bef014..877480195d 100644 --- a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py +++ b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py @@ -4,6 +4,7 @@ from crowdgit.database.crud import ( acquire_repo_for_processing, get_recently_processed_repository_by_url, + increase_re_onboarding_count, mark_repo_as_processed, release_repo, update_last_processed_commit, @@ -262,6 +263,7 @@ async def _process_single_repository(self, repository: Repository): branch=None, ) processing_state = RepositoryState.PENDING + await increase_re_onboarding_count(repository.id) except ParentRepoInvalidError as e: logger.error(f"Parent repo validation failed: {repr(e)}") processing_state = RepositoryState.REQUIRES_PARENT From 3f726015641fbea79373ff95a83807236b92471b Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Mon, 8 Dec 2025 11:54:23 +0100 Subject: [PATCH 2/2] chore: db migration --- .../U1765185575__addReOnboardingCountToGitRepos.sql | 0 .../V1765185575__addReOnboardingCountToGitRepos.sql | 7 +++++++ 2 files changed, 7 insertions(+) create mode 100644 backend/src/database/migrations/U1765185575__addReOnboardingCountToGitRepos.sql create mode 100644 backend/src/database/migrations/V1765185575__addReOnboardingCountToGitRepos.sql diff --git a/backend/src/database/migrations/U1765185575__addReOnboardingCountToGitRepos.sql b/backend/src/database/migrations/U1765185575__addReOnboardingCountToGitRepos.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1765185575__addReOnboardingCountToGitRepos.sql b/backend/src/database/migrations/V1765185575__addReOnboardingCountToGitRepos.sql new file mode 100644 index 0000000000..6fa85ddc33 --- /dev/null +++ b/backend/src/database/migrations/V1765185575__addReOnboardingCountToGitRepos.sql @@ -0,0 +1,7 @@ +-- Add reOnboardingCount column to git.repositories table +-- This column tracks the number of times a repository has been re-onboarded. +-- It is used to identify unreachable commits by matching against activity.attributes.cycle=onboarding-{reOnboardingCount} +ALTER TABLE git.repositories +ADD COLUMN "reOnboardingCount" INTEGER default 0 NOT NULL; + +COMMENT ON COLUMN git.repositories."reOnboardingCount" IS 'Tracks the number of times this repository has been re-onboarded. Used to identify unreachable commits via activity.attributes.cycle matching pattern onboarding-{reOnboardingCount}'; \ No newline at end of file