Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add reOnboardingCount column to git.repositories table
-- This column tracks the number of times a repository has been re-onboarded.
-- It is used to identify unreachable commits by matching against activity.attributes.cycle=onboarding-{reOnboardingCount}
ALTER TABLE git.repositories
ADD COLUMN "reOnboardingCount" INTEGER default 0 NOT NULL;

COMMENT ON COLUMN git.repositories."reOnboardingCount" IS 'Tracks the number of times this repository has been re-onboarded. Used to identify unreachable commits via activity.attributes.cycle matching pattern onboarding-{reOnboardingCount}';
18 changes: 14 additions & 4 deletions services/apps/git_integration/src/crowdgit/database/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def insert_repository(url: str, priority: int = 0) -> str:
async def get_repository_by_url(url: str) -> dict[str, Any] | None:
"""Get repository by URL"""
sql_query = """
SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "stuckRequiresReOnboard"
SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount"
FROM git.repositories
WHERE url = $1 AND "deletedAt" IS NULL
"""
Expand All @@ -49,7 +49,7 @@ async def get_recently_processed_repository_by_url(url: str) -> Repository | Non
Used to check if a repository needs reprocessing based on the update interval.
"""
sql_query = """
SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "segmentId", "stuckRequiresReOnboard"
SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "segmentId", "stuckRequiresReOnboard", "reOnboardingCount"
FROM git.repositories
WHERE url = $1
AND "deletedAt" IS NULL
Expand Down Expand Up @@ -88,7 +88,7 @@ async def acquire_onboarding_repo() -> Repository | None:
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard"
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount"
"""
return await acquire_repository(
onboarding_repo_sql_query,
Expand Down Expand Up @@ -138,7 +138,7 @@ async def acquire_recurrent_repo() -> Repository | None:
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard"
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount"
"""
states_to_exclude = (
RepositoryState.PENDING,
Expand Down Expand Up @@ -220,6 +220,16 @@ async def update_last_processed_commit(repo_id: str, commit_hash: str, branch: s
return str(result)


async def increase_re_onboarding_count(repo_id: str):
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The increase_re_onboarding_count function is missing a docstring. Following the pattern of other functions in this file, it should include a docstring that describes its purpose. For example:

async def increase_re_onboarding_count(repo_id: str):
    """
    Increment the re-onboarding count for a repository.
    
    This is called when a repository requires re-onboarding, typically
    when commits become unreachable and need to be reprocessed.
    """
Suggested change
async def increase_re_onboarding_count(repo_id: str):
async def increase_re_onboarding_count(repo_id: str):
"""
Increment the re-onboarding count for a repository.
This is called when a repository requires re-onboarding, typically
when commits become unreachable and need to be reprocessed.
"""

Copilot uses AI. Check for mistakes.
sql_query = """
UPDATE git.repositories
SET "reOnboardingCount" = "reOnboardingCount" + 1,
"updatedAt" = NOW()
WHERE id = $1
"""
return await execute(sql_query, (repo_id,))


async def mark_repo_as_processed(repo_id: str, repo_state: RepositoryState):
sql_query = """
UPDATE git.repositories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class Repository(BaseModel):
default=False,
description="Indicates if the stuck repository is resolved by a re-onboarding",
)
re_onboarding_count: int = Field(
...,
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The re_onboarding_count field is marked as required (...) but should have a default value of 0 to be consistent with the database schema. This will cause issues when creating Repository instances without providing this value. Change to:

re_onboarding_count: int = Field(
    default=0,
    description="Tracks the number of times this repository has been re-onboarded. Used to identify unreachable commits via activity.attributes.cycle matching pattern onboarding-{reOnboardingCount}",
)
Suggested change
...,
default=0,

Copilot uses AI. Check for mistakes.
description="Tracks the number of times this repository has been re-onboarded. Used to identify unreachable commits via activity.attributes.cycle matching pattern onboarding-{reOnboardingCount}",
)
created_at: datetime = Field(..., description="Creation timestamp")
updated_at: datetime = Field(..., description="Last update timestamp")

Expand Down Expand Up @@ -72,6 +76,7 @@ def from_db(cls, db_data: dict[str, Any]) -> Repository:
"lastMaintainerRunAt": "last_maintainer_run_at",
"forkedFrom": "forked_from",
"stuckRequiresReOnboard": "stuck_requires_re_onboard",
"reOnboardingCount": "re_onboarding_count",
}
for db_field, model_field in field_mapping.items():
if db_field in repo_data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def create_activity(
member: dict,
source_id: str,
segment_id: str,
re_onboarding_count: int,
source_parent_id: str = "",
) -> dict:
"""
Expand All @@ -366,6 +367,8 @@ def create_activity(
member: Member information dictionary
source_id: Source ID for the activity
segment_id: Segment identifier
re_onboarding_count: Number of times the repository has been re-onboarded.
Used to set activity.attributes.cycle when > 0.
source_parent_id: Parent source ID (optional)

Returns:
Expand Down Expand Up @@ -416,7 +419,7 @@ def create_activity(
# Pre-calculate commit attributes to avoid repeated lookups
insertions = commit.get("insertions", 0)
deletions = commit.get("deletions", 0)
return {
activity = {
"type": activity_type,
"timestamp": timestamp,
"sourceId": source_id,
Expand All @@ -436,6 +439,9 @@ def create_activity(
"member": processed_member,
"segmentId": segment_id,
}
if re_onboarding_count > 0:
activity["attributes"]["cycle"] = f"onboarding-{re_onboarding_count}"
return activity

def extract_activities(self, commit_message: list[str]) -> list[dict[str, dict[str, str]]]:
"""
Expand Down Expand Up @@ -500,7 +506,12 @@ def prepare_activity_for_db_and_queue(
return activity_db, activity_kafka

def create_activities_from_commit(
self, remote: str, commit: dict[str, Any], segment_id: str, integration_id: str
self,
remote: str,
commit: dict[str, Any],
segment_id: str,
integration_id: str,
re_onboarding_count: int,
) -> tuple[list[tuple], list[dict[str, Any]]]:
"""
Create activities from a commit with improved efficiency.
Expand All @@ -510,6 +521,8 @@ def create_activities_from_commit(
commit: The commit dictionary containing commit data
segment_id: Segment identifier
integration_id: Integration identifier
re_onboarding_count: Number of times the repository has been re-onboarded.
Used to set activity.attributes.cycle when > 0.

Returns:
Tuple of (activities_db, activities_queue) lists
Expand Down Expand Up @@ -537,6 +550,7 @@ def create_activities_from_commit(
member=author,
source_id=commit_hash,
segment_id=segment_id,
re_onboarding_count=re_onboarding_count,
)
activity_db, activity_kafka = self.prepare_activity_for_db_and_queue(
activity, segment_id, integration_id
Expand Down Expand Up @@ -565,6 +579,7 @@ def create_activities_from_commit(
source_id=committer_source_id,
source_parent_id=commit_hash,
segment_id=segment_id,
re_onboarding_count=re_onboarding_count,
)
activity_db, activity_kafka = self.prepare_activity_for_db_and_queue(
activity, segment_id, integration_id
Expand Down Expand Up @@ -599,6 +614,7 @@ def create_activities_from_commit(
source_id=source_id,
source_parent_id=commit_hash,
segment_id=segment_id,
re_onboarding_count=re_onboarding_count,
)
activity_db, activity_kafka = self.prepare_activity_for_db_and_queue(
activity, segment_id, integration_id
Expand Down Expand Up @@ -704,7 +720,11 @@ async def process_commits_chunk(
commit = self._construct_commit_dict(commit_lines, numstats_text)
if self._validate_commit_data(commit):
activity_db_records, activity_kafka = self.create_activities_from_commit(
batch_info.remote, commit, repository.segment_id, repository.integration_id
batch_info.remote,
commit,
repository.segment_id,
repository.integration_id,
repository.re_onboarding_count,
)
activities_db.extend(activity_db_records)
activities_queue.extend(activity_kafka)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from crowdgit.database.crud import (
acquire_repo_for_processing,
get_recently_processed_repository_by_url,
increase_re_onboarding_count,
mark_repo_as_processed,
release_repo,
update_last_processed_commit,
Expand Down Expand Up @@ -262,6 +263,7 @@ async def _process_single_repository(self, repository: Repository):
branch=None,
)
processing_state = RepositoryState.PENDING
await increase_re_onboarding_count(repository.id)
except ParentRepoInvalidError as e:
logger.error(f"Parent repo validation failed: {repr(e)}")
processing_state = RepositoryState.REQUIRES_PARENT
Expand Down
Loading