From 70a212846ee1432fe29cf4b33dc106810ec0f617 Mon Sep 17 00:00:00 2001 From: attiasas Date: Sun, 22 Mar 2026 12:03:07 +0200 Subject: [PATCH 1/3] Add cache and parallel run to Git Count Contributors --- cli/docs/flags.go | 7 +- cli/gitcommands.go | 22 ++ commands/git/contributors/cache.go | 140 ++++++++++ commands/git/contributors/cache_test.go | 63 +++++ .../git/contributors/countcontributors.go | 248 +++++++++++++++--- .../contributors/countcontributors_test.go | 164 +++++++++++- .../git/contributors/mock_vcs_client_test.go | 164 ++++++++++++ utils/paths.go | 11 +- 8 files changed, 784 insertions(+), 35 deletions(-) create mode 100644 commands/git/contributors/cache.go create mode 100644 commands/git/contributors/cache_test.go create mode 100644 commands/git/contributors/mock_vcs_client_test.go diff --git a/cli/docs/flags.go b/cli/docs/flags.go index 07a4149ed..a4d3f82af 100644 --- a/cli/docs/flags.go +++ b/cli/docs/flags.go @@ -162,6 +162,7 @@ const ( IncludeCachedPackages = "include-cached-packages" // Unique git flags + gitPrefix = "git-" InputFile = "input-file" ScmType = "scm-type" ScmApiUrl = "scm-api-url" @@ -170,6 +171,8 @@ const ( RepoName = "repo-name" Months = "months" DetailedSummary = "detailed-summary" + CacheValidity = "cache-validity" + GitThreads = gitPrefix + Threads ) // Mapping between security commands (key) and their flags (key). @@ -221,7 +224,7 @@ var commandFlags = map[string][]string{ CurationOutput, WorkingDirs, Threads, RequirementsFile, InsecureTls, useWrapperAudit, UseIncludedBuilds, SolutionPath, DockerImageName, IncludeCachedPackages, }, GitCountContributors: { - InputFile, ScmType, ScmApiUrl, Token, Owner, RepoName, Months, DetailedSummary, InsecureTls, + InputFile, ScmType, ScmApiUrl, Token, Owner, RepoName, Months, DetailedSummary, InsecureTls, GitThreads, CacheValidity, }, SastServer: { Port, @@ -366,6 +369,8 @@ var flagsMap = map[string]components.Flag{ RepoName: components.NewStringFlag(RepoName, "List of semicolon-separated(;) repositories names to analyze, If not provided all repositories related to the provided owner will be analyzed."), Months: components.NewStringFlag(Months, "Number of months to analyze.", components.WithIntDefaultValue(contributors.DefaultContContributorsMonths)), DetailedSummary: components.NewBoolFlag(DetailedSummary, "Set to true to get a contributors detailed summary."), + CacheValidity: components.NewStringFlag(CacheValidity, "Number of days a cached repository result remains valid. Set to 0 to ignore cache and force a full re-scan.", components.WithIntDefaultValue(contributors.DefaultCacheValidity)), + GitThreads: components.NewStringFlag(Threads, "Number of parallel threads for scanning repositories.", components.WithIntDefaultValue(contributors.DefaultThreads)), } func GetCommandFlags(cmdKey string) []components.Flag { diff --git a/cli/gitcommands.go b/cli/gitcommands.go index 8ea37082f..d96460741 100644 --- a/cli/gitcommands.go +++ b/cli/gitcommands.go @@ -181,6 +181,28 @@ func GetCountContributorsParams(c *components.Context) (*contributors.CountContr } // DetailedSummery params.DetailedSummery = c.GetBoolFlagValue(flags.DetailedSummary) + // CacheValidity + if !c.IsFlagSet(flags.CacheValidity) { + params.CacheValidity = contributors.DefaultCacheValidity + } else { + cacheValidity, err := c.GetIntFlagValue(flags.CacheValidity) + if err != nil { + return nil, err + } + if cacheValidity < 0 { + return nil, errorutils.CheckErrorf("Invalid value for '--%s=%d'. Must be 0 (skip cache) or a positive number of days.", flags.CacheValidity, cacheValidity) + } + params.CacheValidity = cacheValidity + } + // Threads + threads, err := c.GetIntFlagValue(flags.Threads) + if err != nil { + return nil, err + } + if threads <= 0 { + return nil, errorutils.CheckErrorf("Invalid value for '--%s=%d'. If set, should be a positive number.", flags.GitThreads, threads) + } + params.Threads = threads return ¶ms, nil } diff --git a/commands/git/contributors/cache.go b/commands/git/contributors/cache.go new file mode 100644 index 000000000..b1560a583 --- /dev/null +++ b/commands/git/contributors/cache.go @@ -0,0 +1,140 @@ +package contributors + +import ( + "crypto/sha256" + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + secutils "github.com/jfrog/jfrog-cli-security/utils" + "github.com/jfrog/jfrog-client-go/utils/log" +) + +// cacheContributorEntry serializes one entry of the uniqueContributors map. +// Go's encoding/json does not support struct keys in maps, so we use a slice instead. +type cacheContributorEntry struct { + Key BasicContributor `json:"key"` + Value Contributor `json:"value"` +} + +const ( + DefaultCacheValidity = 3 // days +) + +// repoCacheFile holds the data persisted to disk for one fully-scanned repository. +type repoCacheFile struct { + Repo string `json:"repo"` + ScannedAt string `json:"scanned_at"` + Months int `json:"last_months_analyzed"` + UniqueContributors []cacheContributorEntry `json:"unique_contributors"` + DetailedContributors map[string]map[string]ContributorDetailedSummary `json:"detailed_contributors,omitempty"` + DetailedRepos map[string]map[string]RepositoryDetailedSummary `json:"detailed_repos,omitempty"` + TotalCommits int `json:"total_commits"` + Skipped bool `json:"skipped,omitempty"` +} + +// getRepoCacheDir returns (and creates) the cache directory for a specific combination of +// scm-type / scm-api-url / owner / months so caches never collide across configurations. +func getRepoCacheDir(params BasicGitServerParams, months int) (string, error) { + base, err := secutils.GetContributorsCacheDir() + if err != nil { + return "", fmt.Errorf("failed to determine contributors cache directory: %w", err) + } + key := fmt.Sprintf("%d|%s|%s|%d", params.ScmType, params.ScmApiUrl, params.Owner, months) + hash := fmt.Sprintf("%x", sha256.Sum256([]byte(key))) + dir := filepath.Join(base, hash) + if err = os.MkdirAll(dir, 0700); err != nil { + return "", fmt.Errorf("failed to create cache directory %s: %w", dir, err) + } + return dir, nil +} + +// readRepoCache reads the cache entry for repo. Returns nil when the file does not exist +// or when the entry is older than maxAge. maxAge == 0 means "always expired" (skip cache). +func readRepoCache(cacheDir, repo string, maxAge time.Duration) *repoScanResult { + if maxAge <= 0 { + return nil + } + path := filepath.Join(cacheDir, sanitizeFilename(repo)+".json") + data, err := os.ReadFile(path) + if err != nil { + // File simply doesn't exist yet — not an error worth logging. + return nil + } + var entry repoCacheFile + if err = json.Unmarshal(data, &entry); err != nil { + log.Warn("Contributors cache: failed to parse cache file %s: %v", path, err) + return nil + } + scannedAt, err := time.Parse(time.RFC3339, entry.ScannedAt) + if err != nil { + log.Warn("Contributors cache: invalid scanned_at in %s: %v", path, err) + return nil + } + if time.Since(scannedAt) > maxAge { + log.Debug("Contributors cache: entry for %q expired (scanned %s ago)", repo, time.Since(scannedAt).Round(time.Second)) + return nil + } + log.Debug("Contributors cache: using cached data for repo %q (scanned at %s)", repo, entry.ScannedAt) + uniqueContributors := make(map[BasicContributor]Contributor, len(entry.UniqueContributors)) + for _, e := range entry.UniqueContributors { + uniqueContributors[e.Key] = e.Value + } + return &repoScanResult{ + repo: entry.Repo, + uniqueContributors: uniqueContributors, + detailedContributors: entry.DetailedContributors, + detailedRepos: entry.DetailedRepos, + totalCommits: entry.TotalCommits, + skipped: entry.Skipped, + } +} + +// writeRepoCache persists the scan result for repo to disk atomically (write tmp → rename). +func writeRepoCache(cacheDir, repo string, result repoScanResult, months int) { + uniqueEntries := make([]cacheContributorEntry, 0, len(result.uniqueContributors)) + for k, v := range result.uniqueContributors { + uniqueEntries = append(uniqueEntries, cacheContributorEntry{Key: k, Value: v}) + } + entry := repoCacheFile{ + Repo: result.repo, + ScannedAt: time.Now().UTC().Format(time.RFC3339), + Months: months, + UniqueContributors: uniqueEntries, + DetailedContributors: result.detailedContributors, + DetailedRepos: result.detailedRepos, + TotalCommits: result.totalCommits, + Skipped: result.skipped, + } + data, err := json.Marshal(entry) + if err != nil { + log.Warn("Contributors cache: failed to marshal cache for repo %q: %v", repo, err) + return + } + finalPath := filepath.Join(cacheDir, sanitizeFilename(repo)+".json") + tmpPath := finalPath + ".tmp" + if err = os.WriteFile(tmpPath, data, 0600); err != nil { + log.Warn("Contributors cache: failed to write tmp file %s: %v", tmpPath, err) + return + } + if err = os.Rename(tmpPath, finalPath); err != nil { + log.Warn("Contributors cache: failed to rename %s → %s: %v", tmpPath, finalPath, err) + _ = os.Remove(tmpPath) + } +} + +// sanitizeFilename replaces characters that are unsafe in file names (e.g. '/' in repo paths). +func sanitizeFilename(name string) string { + safe := make([]byte, len(name)) + for i := range name { + c := name[i] + if c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' || c == '"' || c == '<' || c == '>' || c == '|' { + safe[i] = '_' + } else { + safe[i] = c + } + } + return string(safe) +} diff --git a/commands/git/contributors/cache_test.go b/commands/git/contributors/cache_test.go new file mode 100644 index 000000000..69a98d27e --- /dev/null +++ b/commands/git/contributors/cache_test.go @@ -0,0 +1,63 @@ +package contributors + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWriteAndReadRepoCache(t *testing.T) { + dir := t.TempDir() + result := repoScanResult{ + repo: "my-org/my-repo", + totalCommits: 42, + uniqueContributors: map[BasicContributor]Contributor{ + {Email: "alice@example.com", Repo: "my-org/my-repo"}: { + Email: "alice@example.com", + Name: "Alice", + RepoLastCommit: RepoLastCommit{ + Repo: "my-org/my-repo", + LastCommit: LastCommit{Date: "2024-01-01T00:00:00Z", Hash: "abc123"}, + }, + }, + }, + } + + writeRepoCache(dir, "my-org/my-repo", result, 3) + + got := readRepoCache(dir, "my-org/my-repo", 24*time.Hour) + require.NotNil(t, got) + assert.Equal(t, result.repo, got.repo) + assert.Equal(t, result.totalCommits, got.totalCommits) + // uniqueContributors round-trips through a []cacheContributorEntry slice, verify key presence. + for k, v := range result.uniqueContributors { + gotVal, ok := got.uniqueContributors[k] + assert.True(t, ok, "expected key %v in cached result", k) + assert.Equal(t, v, gotVal) + } +} + +func TestReadRepoCache_Expired(t *testing.T) { + dir := t.TempDir() + result := repoScanResult{repo: "repo", totalCommits: 1} + writeRepoCache(dir, "repo", result, 1) + + // maxAge of 1 nanosecond is guaranteed to be exceeded by the time we read. + got := readRepoCache(dir, "repo", 1*time.Nanosecond) + assert.Nil(t, got) +} + +func TestReadRepoCache_ZeroMaxAge(t *testing.T) { + dir := t.TempDir() + // maxAge == 0 short-circuits before any file I/O. + got := readRepoCache(dir, "any-repo", 0) + assert.Nil(t, got) +} + +func TestReadRepoCache_Missing(t *testing.T) { + dir := t.TempDir() + got := readRepoCache(dir, "nonexistent-repo", 24*time.Hour) + assert.Nil(t, got) +} diff --git a/commands/git/contributors/countcontributors.go b/commands/git/contributors/countcontributors.go index 32d2e528e..0537fab4d 100644 --- a/commands/git/contributors/countcontributors.go +++ b/commands/git/contributors/countcontributors.go @@ -6,13 +6,16 @@ import ( "fmt" "sort" "strings" + "sync" "time" "github.com/google/go-github/v56/github" "github.com/jfrog/froggit-go/vcsclient" "github.com/jfrog/froggit-go/vcsutils" + "github.com/jfrog/gofrog/parallel" "github.com/jfrog/jfrog-cli-core/v2/utils/config" "github.com/jfrog/jfrog-cli-security/utils/results/output" + clientutils "github.com/jfrog/jfrog-client-go/utils" ioUtils "github.com/jfrog/jfrog-client-go/utils/io" "github.com/jfrog/jfrog-client-go/utils/log" "golang.org/x/exp/maps" @@ -27,6 +30,7 @@ const ( Gitlab = scmTypeName("gitlab") BitbucketServer = scmTypeName("bitbucket") DefaultContContributorsMonths = 1 + DefaultThreads = 10 getCommitsRetryNumber = 5 GithubTokenEnvVar = "JFROG_CLI_GITHUB_TOKEN" // #nosec G101 GitlabTokenEnvVar = "JFROG_CLI_GITLAB_TOKEN" // #nosec G101 @@ -171,6 +175,10 @@ type CountContributorsParams struct { MonthsNum int // Detailed summery flag. DetailedSummery bool + // Number of parallel threads to use when scanning repositories. + Threads int + // Number of days a cached repository result remains valid. 0 = skip cache. + CacheValidity int // Progress bar. Progress ioUtils.ProgressMgr } @@ -212,8 +220,28 @@ func (vs *ScmType) GetOptionalScmTypeTokenEnvVars() string { return fmt.Sprintf("%s or %s", streamsStr, envVars[len(envVars)-1]) } +// repoScanResult holds the result of scanning a single repository. +type repoScanResult struct { + repo string + uniqueContributors map[BasicContributor]Contributor + detailedContributors map[string]map[string]ContributorDetailedSummary + detailedRepos map[string]map[string]RepositoryDetailedSummary + totalCommits int + skipped bool +} + +// vcsServerScanResult holds the aggregated results from scanning all repos of one VCS server. +type vcsServerScanResult struct { + uniqueContributors map[BasicContributor]Contributor + detailedContributors map[string]map[string]ContributorDetailedSummary + detailedRepos map[string]map[string]RepositoryDetailedSummary + scannedRepos []string + skippedRepos []string + totalCommits int +} + func (cc *CountContributorsCommand) Run() error { - log.Info("The CLI outputs may include an estimation of the contributing developers based on the input provided by the user. They may be based on third-party resources and databases and JFrog does not guarantee that the CLI outputs are accurate and/or complete. The CLI outputs are not legal advice and you are solely responsible for your use of it. CLI outputs are provided “as is” and any representation or warranty of or concerning any third-party technology is strictly between the user and the third-party owner or distributor of the third-party technology.") + log.Info("The CLI outputs may include an estimation of the contributing developers based on the input provided by the user. They may be based on third-party resources and databases and JFrog does not guarantee that the CLI outputs are accurate and/or complete. The CLI outputs are not legal advice and you are solely responsible for your use of it. CLI outputs are provided \"as is\" and any representation or warranty of or concerning any third-party technology is strictly between the user and the third-party owner or distributor of the third-party technology.") if cc.Progress != nil { cc.Progress.SetHeadlineMsg("Calculating Git contributors information") } @@ -224,20 +252,51 @@ func (cc *CountContributorsCommand) Run() error { var totalScannedRepos []string var totalSkippedRepos []string totalCommitsNumber := 0 + vcsCountContributors, err := cc.getVcsCountContributors() if err != nil { return err } - // Scan all repos from all provided git servers. - for _, vcc := range vcsCountContributors { - repositories, err := vcc.getRepositoriesListToScan() - if err != nil { - return err + + // Scan all VCS servers in parallel. + serverResults := make([]vcsServerScanResult, len(vcsCountContributors)) + var serverResultsMu sync.Mutex + threads := cc.Threads + if threads <= 0 { + threads = DefaultThreads + } + runner := parallel.NewRunner(threads, 20000, false) + for i, vcc := range vcsCountContributors { + if _, addErr := runner.AddTaskWithError(func(threadId int) error { + logPrefix := clientutils.GetLogMsgPrefix(threadId, false) + log.Info(logPrefix + fmt.Sprintf("Scanning VCS server: owner=%q, url=%q", vcc.params.Owner, vcc.params.ScmApiUrl)) + repositories, repoListErr := vcc.getRepositoriesListToScan() + if repoListErr != nil { + return repoListErr + } + log.Info(logPrefix + fmt.Sprintf("Found %d repositories for %q, scanning in parallel", len(repositories), vcc.params.Owner)) + result := vcc.scanAndCollectCommitsInfo(repositories) + serverResultsMu.Lock() + serverResults[i] = result + serverResultsMu.Unlock() + return nil + }, func(taskErr error) { + log.Error("Error scanning VCS server: %v", taskErr) + }); addErr != nil { + return fmt.Errorf("failed to add VCS server scan task: %w", addErr) } - scannedRepos, skippedRepos, commitsNumber := vcc.scanAndCollectCommitsInfo(repositories, uniqueContributors, detailedContributors, detailedRepos) - totalScannedRepos = append(totalScannedRepos, scannedRepos...) - totalSkippedRepos = append(totalSkippedRepos, skippedRepos...) - totalCommitsNumber += commitsNumber + } + runner.Done() + runner.Run() + + // Merge server results sequentially. + for _, sr := range serverResults { + mergeContributors(uniqueContributors, sr.uniqueContributors) + mergeDetailedContributors(detailedContributors, sr.detailedContributors) + mergeDetailedRepos(detailedRepos, sr.detailedRepos) + totalScannedRepos = append(totalScannedRepos, sr.scannedRepos...) + totalSkippedRepos = append(totalSkippedRepos, sr.skippedRepos...) + totalCommitsNumber += sr.totalCommits } // Create the report. @@ -275,7 +334,13 @@ func (cc *CountContributorsCommand) getVcsCountContributors() ([]VcsCountContrib if err = param.validate(); err != nil { return nil, err } - p := CountContributorsParams{BasicGitServerParams: param, MonthsNum: cc.MonthsNum, DetailedSummery: cc.DetailedSummery} + p := CountContributorsParams{ + BasicGitServerParams: param, + MonthsNum: cc.MonthsNum, + DetailedSummery: cc.DetailedSummery, + Threads: cc.Threads, + CacheValidity: cc.CacheValidity, + } vcsClient, err := vcsclient.NewClientBuilder(param.ScmType).ApiEndpoint(param.ScmApiUrl).Token(param.Token).Build() if err != nil { return nil, err @@ -285,37 +350,121 @@ func (cc *CountContributorsCommand) getVcsCountContributors() ([]VcsCountContrib return contributors, nil } -func (cc *VcsCountContributors) scanAndCollectCommitsInfo(repositories []string, uniqueContributors map[BasicContributor]Contributor, detailedContributors map[string]map[string]ContributorDetailedSummary, detailedRepos map[string]map[string]RepositoryDetailedSummary) (scannedRepos, skippedRepos []string, totalCommits int) { - // initialize commits query options. - commitsListOptions := vcsclient.GitCommitsQueryOptions{ +// scanAndCollectCommitsInfo scans repositories in parallel and returns aggregated results. +func (cc *VcsCountContributors) scanAndCollectCommitsInfo(repositories []string) vcsServerScanResult { + cacheValidity := time.Duration(cc.params.CacheValidity) * 24 * time.Hour + + // Compute cache directory once (best-effort; empty string disables cache). + cacheDir := "" + if cc.params.CacheValidity >= 0 { + dir, err := getRepoCacheDir(cc.params.BasicGitServerParams, cc.params.MonthsNum) + if err != nil { + log.Warn("Contributors cache: failed to determine cache directory: %v. Continuing without cache.", err) + } else { + cacheDir = dir + } + } + + baseOptions := vcsclient.GitCommitsQueryOptions{ Since: time.Now().AddDate(0, -1*cc.params.MonthsNum, 0), ListOptions: vcsclient.ListOptions{ Page: 1, PerPage: vcsutils.NumberOfCommitsToFetch, }, } - for _, repo := range repositories { - // Get repository's commits using pagination until there are no more commits. - commits, getCommitsErr := cc.GetCommitsWithQueryOptions(repo, commitsListOptions) - for { - if getCommitsErr != nil { - skippedRepos = append(skippedRepos, repo) - break - } - if len(commits) == 0 { - break - } - cc.saveCommitsInfoInMaps(repo, commits, uniqueContributors, detailedContributors, detailedRepos) - commitsListOptions.Page++ - totalCommits += len(commits) - commits, getCommitsErr = cc.GetCommitsWithQueryOptions(repo, commitsListOptions) + repoResults := make([]repoScanResult, len(repositories)) + var repoResultsMu sync.Mutex + + threads := cc.params.Threads + if threads <= 0 { + threads = DefaultThreads + } + runner := parallel.NewRunner(threads, 20000, false) + for i, repo := range repositories { + if _, addErr := runner.AddTaskWithError(func(threadId int) error { + result := cc.scanRepo(repo, baseOptions, cacheDir, cacheValidity, threadId) + repoResultsMu.Lock() + repoResults[i] = result + repoResultsMu.Unlock() + return nil + }, func(taskErr error) { + log.Error("Error scanning repo %q: %v", repo, taskErr) + }); addErr != nil { + log.Error("Failed to enqueue repo %q for scanning: %v", repo, addErr) + } + } + runner.Done() + runner.Run() + + // Merge all repo results into the server-level result. + sr := vcsServerScanResult{ + uniqueContributors: make(map[BasicContributor]Contributor), + detailedContributors: make(map[string]map[string]ContributorDetailedSummary), + detailedRepos: make(map[string]map[string]RepositoryDetailedSummary), + } + for _, rr := range repoResults { + if rr.repo == "" { + continue + } + if rr.skipped { + sr.skippedRepos = append(sr.skippedRepos, rr.repo) + } else { + sr.scannedRepos = append(sr.scannedRepos, rr.repo) + sr.totalCommits += rr.totalCommits + mergeContributors(sr.uniqueContributors, rr.uniqueContributors) + mergeDetailedContributors(sr.detailedContributors, rr.detailedContributors) + mergeDetailedRepos(sr.detailedRepos, rr.detailedRepos) + } + } + return sr +} + +// scanRepo scans a single repository (with cache) and returns its result. +func (cc *VcsCountContributors) scanRepo(repo string, baseOptions vcsclient.GitCommitsQueryOptions, cacheDir string, cacheValidity time.Duration, threadId int) repoScanResult { + logPrefix := clientutils.GetLogMsgPrefix(threadId, false) + log.Info(logPrefix + fmt.Sprintf("Scanning repository %q", repo)) + + // Try cache first. + if cacheDir != "" { + if cached := readRepoCache(cacheDir, repo, cacheValidity); cached != nil { + log.Info(logPrefix + fmt.Sprintf("Using cached data for repository %q (%d commits)", repo, cached.totalCommits)) + return *cached + } + } + + result := repoScanResult{ + repo: repo, + uniqueContributors: make(map[BasicContributor]Contributor), + detailedContributors: make(map[string]map[string]ContributorDetailedSummary), + detailedRepos: make(map[string]map[string]RepositoryDetailedSummary), + } + + // Paginate through commits. + opts := baseOptions + opts.Page = 1 + for { + commits, err := cc.GetCommitsWithQueryOptions(repo, opts) + if err != nil { + log.Warn(logPrefix + fmt.Sprintf("Skipping repository %q: failed to fetch commits: %v", repo, err)) + result.skipped = true + return result } - if getCommitsErr == nil { - scannedRepos = append(scannedRepos, repo) + if len(commits) == 0 { + break } + cc.saveCommitsInfoInMaps(repo, commits, result.uniqueContributors, result.detailedContributors, result.detailedRepos) + result.totalCommits += len(commits) + opts.Page++ } - return + + log.Info(logPrefix + fmt.Sprintf("Done scanning repository %q: %d commits found", repo, result.totalCommits)) + + // Persist to cache (even skipped=false means success here). + if cacheDir != "" { + writeRepoCache(cacheDir, repo, result, cc.params.MonthsNum) + } + return result } // getRepositoriesListToScan returns a list of repositories to scan. @@ -477,6 +626,41 @@ func (cc *CountContributorsCommand) aggregateReportResults(uniqueContributors ma return report } +// mergeContributors merges src into dst (dst wins on key collision — first-seen is latest commit). +func mergeContributors(dst, src map[BasicContributor]Contributor) { + for k, v := range src { + if _, exists := dst[k]; !exists { + dst[k] = v + } + } +} + +func mergeDetailedContributors(dst, src map[string]map[string]ContributorDetailedSummary) { + for email, repoMap := range src { + if dst[email] == nil { + dst[email] = make(map[string]ContributorDetailedSummary) + } + for repo, detail := range repoMap { + if _, exists := dst[email][repo]; !exists { + dst[email][repo] = detail + } + } + } +} + +func mergeDetailedRepos(dst, src map[string]map[string]RepositoryDetailedSummary) { + for repo, authorMap := range src { + if dst[repo] == nil { + dst[repo] = make(map[string]RepositoryDetailedSummary) + } + for email, detail := range authorMap { + if _, exists := dst[repo][email]; !exists { + dst[repo][email] = detail + } + } + } +} + // Returns the Server details. The usage report is sent to this server. func (cc *CountContributorsCommand) ServerDetails() (*config.ServerDetails, error) { return nil, nil diff --git a/commands/git/contributors/countcontributors_test.go b/commands/git/contributors/countcontributors_test.go index 2da8a2be0..113fa0562 100644 --- a/commands/git/contributors/countcontributors_test.go +++ b/commands/git/contributors/countcontributors_test.go @@ -1,6 +1,10 @@ package contributors import ( + "context" + "sort" + "time" + "github.com/jfrog/froggit-go/vcsclient" "github.com/jfrog/froggit-go/vcsutils" "github.com/stretchr/testify/assert" @@ -8,7 +12,6 @@ import ( "path/filepath" "reflect" "testing" - "time" ) func getCommitsListForTest(t *testing.T) []vcsclient.CommitInfo { @@ -319,6 +322,165 @@ func TestCountContributorsCommand_saveCommitsInfoInMaps_MultipleRepos(t *testing } } +// ---- merge helpers tests ---- + +func TestMergeContributors(t *testing.T) { + key1 := BasicContributor{Email: "a@example.com", Repo: "repo1"} + key2 := BasicContributor{Email: "b@example.com", Repo: "repo1"} + + contrib1 := Contributor{Email: "a@example.com", Name: "A"} + contrib2 := Contributor{Email: "b@example.com", Name: "B"} + contrib1Alt := Contributor{Email: "a@example.com", Name: "A-alt"} + + t.Run("empty src", func(t *testing.T) { + dst := map[BasicContributor]Contributor{key1: contrib1} + mergeContributors(dst, map[BasicContributor]Contributor{}) + assert.Equal(t, contrib1, dst[key1]) + assert.Len(t, dst, 1) + }) + t.Run("non-overlapping keys merged", func(t *testing.T) { + dst := map[BasicContributor]Contributor{key1: contrib1} + mergeContributors(dst, map[BasicContributor]Contributor{key2: contrib2}) + assert.Equal(t, contrib1, dst[key1]) + assert.Equal(t, contrib2, dst[key2]) + }) + t.Run("collision: dst wins", func(t *testing.T) { + dst := map[BasicContributor]Contributor{key1: contrib1} + mergeContributors(dst, map[BasicContributor]Contributor{key1: contrib1Alt}) + assert.Equal(t, contrib1, dst[key1], "existing entry should not be overwritten") + }) +} + +func TestMergeDetailedContributors(t *testing.T) { + detail1 := ContributorDetailedSummary{RepoPath: "repo1", LastCommit: LastCommit{Date: "2024-01-01T00:00:00Z"}} + detail2 := ContributorDetailedSummary{RepoPath: "repo2", LastCommit: LastCommit{Date: "2024-02-01T00:00:00Z"}} + detailAlt := ContributorDetailedSummary{RepoPath: "repo1", LastCommit: LastCommit{Date: "2024-03-01T00:00:00Z"}} + + t.Run("empty src", func(t *testing.T) { + dst := map[string]map[string]ContributorDetailedSummary{ + "alice": {"repo1": detail1}, + } + mergeDetailedContributors(dst, map[string]map[string]ContributorDetailedSummary{}) + assert.Equal(t, detail1, dst["alice"]["repo1"]) + }) + t.Run("new email and new repo merged", func(t *testing.T) { + dst := map[string]map[string]ContributorDetailedSummary{ + "alice": {"repo1": detail1}, + } + src := map[string]map[string]ContributorDetailedSummary{ + "alice": {"repo2": detail2}, + "bob": {"repo1": detail1}, + } + mergeDetailedContributors(dst, src) + assert.Equal(t, detail2, dst["alice"]["repo2"]) + assert.Equal(t, detail1, dst["bob"]["repo1"]) + }) + t.Run("collision: dst wins", func(t *testing.T) { + dst := map[string]map[string]ContributorDetailedSummary{ + "alice": {"repo1": detail1}, + } + mergeDetailedContributors(dst, map[string]map[string]ContributorDetailedSummary{ + "alice": {"repo1": detailAlt}, + }) + assert.Equal(t, detail1, dst["alice"]["repo1"], "existing entry should not be overwritten") + }) +} + +func TestMergeDetailedRepos(t *testing.T) { + summary1 := RepositoryDetailedSummary{Email: "alice@example.com", LastCommit: LastCommit{Date: "2024-01-01T00:00:00Z"}} + summary2 := RepositoryDetailedSummary{Email: "bob@example.com", LastCommit: LastCommit{Date: "2024-02-01T00:00:00Z"}} + summaryAlt := RepositoryDetailedSummary{Email: "alice@example.com", LastCommit: LastCommit{Date: "2024-03-01T00:00:00Z"}} + + t.Run("empty src", func(t *testing.T) { + dst := map[string]map[string]RepositoryDetailedSummary{ + "repo1": {"alice@example.com": summary1}, + } + mergeDetailedRepos(dst, map[string]map[string]RepositoryDetailedSummary{}) + assert.Equal(t, summary1, dst["repo1"]["alice@example.com"]) + }) + t.Run("new repo and new author merged", func(t *testing.T) { + dst := map[string]map[string]RepositoryDetailedSummary{ + "repo1": {"alice@example.com": summary1}, + } + src := map[string]map[string]RepositoryDetailedSummary{ + "repo1": {"bob@example.com": summary2}, + "repo2": {"alice@example.com": summary1}, + } + mergeDetailedRepos(dst, src) + assert.Equal(t, summary2, dst["repo1"]["bob@example.com"]) + assert.Equal(t, summary1, dst["repo2"]["alice@example.com"]) + }) + t.Run("collision: dst wins", func(t *testing.T) { + dst := map[string]map[string]RepositoryDetailedSummary{ + "repo1": {"alice@example.com": summary1}, + } + mergeDetailedRepos(dst, map[string]map[string]RepositoryDetailedSummary{ + "repo1": {"alice@example.com": summaryAlt}, + }) + assert.Equal(t, summary1, dst["repo1"]["alice@example.com"], "existing entry should not be overwritten") + }) +} + +// ---- parallel scan test ---- + +func TestScanAndCollectCommitsInfo_Parallel(t *testing.T) { + ts1 := convertDateStrToTimestamp(t, "2024-01-10T10:00:00Z") + ts2 := convertDateStrToTimestamp(t, "2024-02-10T10:00:00Z") + ts3 := convertDateStrToTimestamp(t, "2024-03-10T10:00:00Z") + ts4 := convertDateStrToTimestamp(t, "2024-04-10T10:00:00Z") + + commitsByRepo := map[string][]vcsclient.CommitInfo{ + "repo1": { + {AuthorEmail: "email1@example.com", AuthorName: "Email1", Timestamp: ts1}, + {AuthorEmail: "email2@example.com", AuthorName: "Email2", Timestamp: ts2}, + }, + "repo2": { + {AuthorEmail: "email2@example.com", AuthorName: "Email2", Timestamp: ts3}, + {AuthorEmail: "email3@example.com", AuthorName: "Email3", Timestamp: ts4}, + }, + "repo3": {}, + } + + mock := &mockVcsClient{ + getCommitsWithQueryOptionsFn: func(_ context.Context, _, repo string, opts vcsclient.GitCommitsQueryOptions) ([]vcsclient.CommitInfo, error) { + // Return commits only on page 1; empty on subsequent pages to stop pagination. + if opts.Page == 1 { + return commitsByRepo[repo], nil + } + return nil, nil + }, + } + + vcc := VcsCountContributors{ + vcsClient: mock, + params: CountContributorsParams{ + BasicGitServerParams: BasicGitServerParams{Owner: "test-owner"}, + MonthsNum: 1, + Threads: 3, + CacheValidity: 0, // disable cache + }, + } + + sr := vcc.scanAndCollectCommitsInfo([]string{"repo1", "repo2", "repo3"}) + + // All three repos should be scanned (none skipped). + sort.Strings(sr.scannedRepos) + assert.Equal(t, []string{"repo1", "repo2", "repo3"}, sr.scannedRepos) + assert.Empty(t, sr.skippedRepos) + + // repo1 has 2 commits, repo2 has 2, repo3 has 0. + assert.Equal(t, 4, sr.totalCommits) + + // 4 unique (email, repo) pairs: email1+repo1, email2+repo1, email2+repo2, email3+repo2. + assert.Len(t, sr.uniqueContributors, 4) + assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email1@example.com", Repo: "repo1"}) + assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email2@example.com", Repo: "repo1"}) + assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email2@example.com", Repo: "repo2"}) + assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email3@example.com", Repo: "repo2"}) +} + +// ---- helpers ---- + func convertDateStrToTimestamp(t *testing.T, dateStr string) int64 { date, err := time.Parse(time.RFC3339, dateStr) assert.NoError(t, err) diff --git a/commands/git/contributors/mock_vcs_client_test.go b/commands/git/contributors/mock_vcs_client_test.go new file mode 100644 index 000000000..fd5c22c84 --- /dev/null +++ b/commands/git/contributors/mock_vcs_client_test.go @@ -0,0 +1,164 @@ +package contributors + +import ( + "context" + + "github.com/jfrog/froggit-go/vcsclient" + "github.com/jfrog/froggit-go/vcsutils" +) + +// mockVcsClient is a test implementation of vcsclient.VcsClient. +// Only GetCommitsWithQueryOptions and ListRepositories are configurable; +// all other methods return zero values and no error. +type mockVcsClient struct { + getCommitsWithQueryOptionsFn func(ctx context.Context, owner, repo string, opts vcsclient.GitCommitsQueryOptions) ([]vcsclient.CommitInfo, error) + listRepositoriesFn func(ctx context.Context) (map[string][]string, error) +} + +func (m *mockVcsClient) GetCommitsWithQueryOptions(ctx context.Context, owner, repository string, options vcsclient.GitCommitsQueryOptions) ([]vcsclient.CommitInfo, error) { + if m.getCommitsWithQueryOptionsFn != nil { + return m.getCommitsWithQueryOptionsFn(ctx, owner, repository, options) + } + return nil, nil +} + +func (m *mockVcsClient) ListRepositories(ctx context.Context) (map[string][]string, error) { + if m.listRepositoriesFn != nil { + return m.listRepositoriesFn(ctx) + } + return nil, nil +} + +// Stub implementations for the remaining VcsClient interface methods. + +func (m *mockVcsClient) TestConnection(_ context.Context) error { return nil } +func (m *mockVcsClient) ListAppRepositories(_ context.Context) ([]vcsclient.AppRepositoryInfo, error) { + return nil, nil +} +func (m *mockVcsClient) ListBranches(_ context.Context, _, _ string) ([]string, error) { + return nil, nil +} +func (m *mockVcsClient) CreateWebhook(_ context.Context, _, _, _, _ string, _ ...vcsutils.WebhookEvent) (string, string, error) { + return "", "", nil +} +func (m *mockVcsClient) UpdateWebhook(_ context.Context, _, _, _, _, _, _ string, _ ...vcsutils.WebhookEvent) error { + return nil +} +func (m *mockVcsClient) DeleteWebhook(_ context.Context, _, _, _ string) error { return nil } +func (m *mockVcsClient) SetCommitStatus(_ context.Context, _ vcsclient.CommitStatus, _, _, _, _, _, _ string) error { + return nil +} +func (m *mockVcsClient) GetCommitStatuses(_ context.Context, _, _, _ string) ([]vcsclient.CommitStatusInfo, error) { + return nil, nil +} +func (m *mockVcsClient) DownloadRepository(_ context.Context, _, _, _, _ string) error { return nil } +func (m *mockVcsClient) CreatePullRequest(_ context.Context, _, _, _, _, _, _ string) error { + return nil +} +func (m *mockVcsClient) CreatePullRequestDetailed(_ context.Context, _, _, _, _, _, _ string) (vcsclient.CreatedPullRequestInfo, error) { + return vcsclient.CreatedPullRequestInfo{}, nil +} +func (m *mockVcsClient) UpdatePullRequest(_ context.Context, _, _, _, _, _ string, _ int, _ vcsutils.PullRequestState) error { + return nil +} +func (m *mockVcsClient) AddPullRequestComment(_ context.Context, _, _, _ string, _ int) error { + return nil +} +func (m *mockVcsClient) AddPullRequestReviewComments(_ context.Context, _ string, _ string, _ int, _ ...vcsclient.PullRequestComment) error { + return nil +} +func (m *mockVcsClient) ListPullRequestReviews(_ context.Context, _, _ string, _ int) ([]vcsclient.PullRequestReviewDetails, error) { + return nil, nil +} +func (m *mockVcsClient) ListPullRequestReviewComments(_ context.Context, _, _ string, _ int) ([]vcsclient.CommentInfo, error) { + return nil, nil +} +func (m *mockVcsClient) DeletePullRequestReviewComments(_ context.Context, _, _ string, _ int, _ ...vcsclient.CommentInfo) error { + return nil +} +func (m *mockVcsClient) ListPullRequestComments(_ context.Context, _, _ string, _ int) ([]vcsclient.CommentInfo, error) { + return nil, nil +} +func (m *mockVcsClient) DeletePullRequestComment(_ context.Context, _, _ string, _, _ int) error { + return nil +} +func (m *mockVcsClient) ListOpenPullRequestsWithBody(_ context.Context, _, _ string) ([]vcsclient.PullRequestInfo, error) { + return nil, nil +} +func (m *mockVcsClient) ListOpenPullRequests(_ context.Context, _, _ string) ([]vcsclient.PullRequestInfo, error) { + return nil, nil +} +func (m *mockVcsClient) GetPullRequestByID(_ context.Context, _, _ string, _ int) (vcsclient.PullRequestInfo, error) { + return vcsclient.PullRequestInfo{}, nil +} +func (m *mockVcsClient) GetLatestCommit(_ context.Context, _, _, _ string) (vcsclient.CommitInfo, error) { + return vcsclient.CommitInfo{}, nil +} +func (m *mockVcsClient) GetCommits(_ context.Context, _, _, _ string) ([]vcsclient.CommitInfo, error) { + return nil, nil +} +func (m *mockVcsClient) ListPullRequestsAssociatedWithCommit(_ context.Context, _, _, _ string) ([]vcsclient.PullRequestInfo, error) { + return nil, nil +} +func (m *mockVcsClient) AddSshKeyToRepository(_ context.Context, _, _, _, _ string, _ vcsclient.Permission) error { + return nil +} +func (m *mockVcsClient) GetRepositoryInfo(_ context.Context, _, _ string) (vcsclient.RepositoryInfo, error) { + return vcsclient.RepositoryInfo{}, nil +} +func (m *mockVcsClient) GetCommitBySha(_ context.Context, _, _, _ string) (vcsclient.CommitInfo, error) { + return vcsclient.CommitInfo{}, nil +} +func (m *mockVcsClient) CreateLabel(_ context.Context, _, _ string, _ vcsclient.LabelInfo) error { + return nil +} +func (m *mockVcsClient) GetLabel(_ context.Context, _, _, _ string) (*vcsclient.LabelInfo, error) { + return nil, nil +} +func (m *mockVcsClient) ListPullRequestLabels(_ context.Context, _, _ string, _ int) ([]string, error) { + return nil, nil +} +func (m *mockVcsClient) UnlabelPullRequest(_ context.Context, _, _, _ string, _ int) error { + return nil +} +func (m *mockVcsClient) UploadCodeScanning(_ context.Context, _, _, _, _ string) (string, error) { + return "", nil +} +func (m *mockVcsClient) UploadCodeScanningWithRef(_ context.Context, _, _, _, _, _ string) (string, error) { + return "", nil +} +func (m *mockVcsClient) DownloadFileFromRepo(_ context.Context, _, _, _, _ string) ([]byte, int, error) { + return nil, 0, nil +} +func (m *mockVcsClient) GetRepositoryEnvironmentInfo(_ context.Context, _, _, _ string) (vcsclient.RepositoryEnvironmentInfo, error) { + return vcsclient.RepositoryEnvironmentInfo{}, nil +} +func (m *mockVcsClient) GetModifiedFiles(_ context.Context, _, _, _, _ string) ([]string, error) { + return nil, nil +} +func (m *mockVcsClient) GetPullRequestCommentSizeLimit() int { return 0 } +func (m *mockVcsClient) GetPullRequestDetailsSizeLimit() int { return 0 } +func (m *mockVcsClient) CreateBranch(_ context.Context, _, _, _, _ string) error { return nil } +func (m *mockVcsClient) AllowWorkflows(_ context.Context, _ string) error { return nil } +func (m *mockVcsClient) AddOrganizationSecret(_ context.Context, _, _, _ string) error { + return nil +} +func (m *mockVcsClient) CreateOrgVariable(_ context.Context, _, _, _ string) error { return nil } +func (m *mockVcsClient) CommitAndPushFiles(_ context.Context, _, _, _, _, _, _ string, _ []vcsclient.FileToCommit) error { + return nil +} +func (m *mockVcsClient) GetRepoCollaborators(_ context.Context, _, _, _, _ string) ([]string, error) { + return nil, nil +} +func (m *mockVcsClient) GetRepoTeamsByPermissions(_ context.Context, _, _ string, _ []string) ([]int64, error) { + return nil, nil +} +func (m *mockVcsClient) CreateOrUpdateEnvironment(_ context.Context, _, _, _ string, _ []int64, _ []string) error { + return nil +} +func (m *mockVcsClient) MergePullRequest(_ context.Context, _, _ string, _ int, _ string) error { + return nil +} +func (m *mockVcsClient) UploadSnapshotToDependencyGraph(_ context.Context, _, _ string, _ *vcsclient.SbomSnapshot) error { + return nil +} diff --git a/utils/paths.go b/utils/paths.go index 8dcb3011f..a83c3342b 100644 --- a/utils/paths.go +++ b/utils/paths.go @@ -17,7 +17,8 @@ import ( ) const ( - JfrogCurationDirName = "curation" + JfrogCurationDirName = "curation" + JfrogContributorsDirName = "contributors-cache" CurationsDir = "JFROG_CLI_CURATION_DIR" @@ -37,6 +38,14 @@ func getJfrogCurationFolder() (string, error) { return filepath.Join(jfrogHome, JfrogCurationDirName), nil } +func GetContributorsCacheDir() (string, error) { + jfrogHome, err := coreutils.GetJfrogHomeDir() + if err != nil { + return "", err + } + return filepath.Join(jfrogHome, JfrogContributorsDirName), nil +} + func GetCurationCacheFolder() (string, error) { curationFolder, err := getJfrogCurationFolder() if err != nil { From f9e7c15cc29bbdea74f6ed0b5a61e9833d6920f6 Mon Sep 17 00:00:00 2001 From: attiasas Date: Sun, 22 Mar 2026 14:42:58 +0200 Subject: [PATCH 2/3] improve error handling and only one runner --- .../git/contributors/countcontributors.go | 198 +++++++++--------- .../contributors/countcontributors_test.go | 47 ++++- 2 files changed, 134 insertions(+), 111 deletions(-) diff --git a/commands/git/contributors/countcontributors.go b/commands/git/contributors/countcontributors.go index 0537fab4d..e6550caa4 100644 --- a/commands/git/contributors/countcontributors.go +++ b/commands/git/contributors/countcontributors.go @@ -230,14 +230,14 @@ type repoScanResult struct { skipped bool } -// vcsServerScanResult holds the aggregated results from scanning all repos of one VCS server. -type vcsServerScanResult struct { - uniqueContributors map[BasicContributor]Contributor - detailedContributors map[string]map[string]ContributorDetailedSummary - detailedRepos map[string]map[string]RepositoryDetailedSummary - scannedRepos []string - skippedRepos []string - totalCommits int +// repoScanTask holds everything needed to scan a single repository in parallel. +type repoScanTask struct { + vcc VcsCountContributors + repo string + baseOptions vcsclient.GitCommitsQueryOptions + cacheDir string + cacheValidity time.Duration + idx int } func (cc *CountContributorsCommand) Run() error { @@ -246,57 +246,47 @@ func (cc *CountContributorsCommand) Run() error { cc.Progress.SetHeadlineMsg("Calculating Git contributors information") } - uniqueContributors := make(map[BasicContributor]Contributor) - detailedContributors := make(map[string]map[string]ContributorDetailedSummary) - detailedRepos := make(map[string]map[string]RepositoryDetailedSummary) - var totalScannedRepos []string - var totalSkippedRepos []string - totalCommitsNumber := 0 - vcsCountContributors, err := cc.getVcsCountContributors() if err != nil { return err } - // Scan all VCS servers in parallel. - serverResults := make([]vcsServerScanResult, len(vcsCountContributors)) - var serverResultsMu sync.Mutex + tasks, err := buildRepoScanTasks(vcsCountContributors) + if err != nil { + return err + } + threads := cc.Threads if threads <= 0 { threads = DefaultThreads } - runner := parallel.NewRunner(threads, 20000, false) - for i, vcc := range vcsCountContributors { - if _, addErr := runner.AddTaskWithError(func(threadId int) error { - logPrefix := clientutils.GetLogMsgPrefix(threadId, false) - log.Info(logPrefix + fmt.Sprintf("Scanning VCS server: owner=%q, url=%q", vcc.params.Owner, vcc.params.ScmApiUrl)) - repositories, repoListErr := vcc.getRepositoriesListToScan() - if repoListErr != nil { - return repoListErr - } - log.Info(logPrefix + fmt.Sprintf("Found %d repositories for %q, scanning in parallel", len(repositories), vcc.params.Owner)) - result := vcc.scanAndCollectCommitsInfo(repositories) - serverResultsMu.Lock() - serverResults[i] = result - serverResultsMu.Unlock() - return nil - }, func(taskErr error) { - log.Error("Error scanning VCS server: %v", taskErr) - }); addErr != nil { - return fmt.Errorf("failed to add VCS server scan task: %w", addErr) + + repoResults, scanErr := runRepoScanTasks(tasks, threads) + + // Merge results sequentially. + uniqueContributors := make(map[BasicContributor]Contributor) + detailedContributors := make(map[string]map[string]ContributorDetailedSummary) + detailedRepos := make(map[string]map[string]RepositoryDetailedSummary) + var totalScannedRepos []string + var totalSkippedRepos []string + totalCommitsNumber := 0 + for _, rr := range repoResults { + if rr.repo == "" { + continue + } + if rr.skipped { + totalSkippedRepos = append(totalSkippedRepos, rr.repo) + } else { + totalScannedRepos = append(totalScannedRepos, rr.repo) + totalCommitsNumber += rr.totalCommits + mergeContributors(uniqueContributors, rr.uniqueContributors) + mergeDetailedContributors(detailedContributors, rr.detailedContributors) + mergeDetailedRepos(detailedRepos, rr.detailedRepos) } } - runner.Done() - runner.Run() - // Merge server results sequentially. - for _, sr := range serverResults { - mergeContributors(uniqueContributors, sr.uniqueContributors) - mergeDetailedContributors(detailedContributors, sr.detailedContributors) - mergeDetailedRepos(detailedRepos, sr.detailedRepos) - totalScannedRepos = append(totalScannedRepos, sr.scannedRepos...) - totalSkippedRepos = append(totalSkippedRepos, sr.skippedRepos...) - totalCommitsNumber += sr.totalCommits + if scanErr != nil { + return scanErr } // Create the report. @@ -350,74 +340,82 @@ func (cc *CountContributorsCommand) getVcsCountContributors() ([]VcsCountContrib return contributors, nil } -// scanAndCollectCommitsInfo scans repositories in parallel and returns aggregated results. -func (cc *VcsCountContributors) scanAndCollectCommitsInfo(repositories []string) vcsServerScanResult { - cacheValidity := time.Duration(cc.params.CacheValidity) * 24 * time.Hour - - // Compute cache directory once (best-effort; empty string disables cache). - cacheDir := "" - if cc.params.CacheValidity >= 0 { - dir, err := getRepoCacheDir(cc.params.BasicGitServerParams, cc.params.MonthsNum) +// buildRepoScanTasks iterates all VCS servers sequentially, lists their repositories, +// and returns a flat slice of repoScanTask ready for parallel execution. +// Fails fast if any server's repository listing fails. +func buildRepoScanTasks(vcsCountContributors []VcsCountContributors) ([]repoScanTask, error) { + var tasks []repoScanTask + for _, vcc := range vcsCountContributors { + repositories, err := vcc.getRepositoriesListToScan() if err != nil { - log.Warn("Contributors cache: failed to determine cache directory: %v. Continuing without cache.", err) - } else { - cacheDir = dir + return nil, fmt.Errorf("failed to list repositories for %q: %w", vcc.params.Owner, err) } - } - baseOptions := vcsclient.GitCommitsQueryOptions{ - Since: time.Now().AddDate(0, -1*cc.params.MonthsNum, 0), - ListOptions: vcsclient.ListOptions{ - Page: 1, - PerPage: vcsutils.NumberOfCommitsToFetch, - }, + cacheValidity := time.Duration(vcc.params.CacheValidity) * 24 * time.Hour + cacheDir := "" + if vcc.params.CacheValidity >= 0 { + if dir, cacheErr := getRepoCacheDir(vcc.params.BasicGitServerParams, vcc.params.MonthsNum); cacheErr != nil { + log.Warn("Contributors cache: failed to determine cache directory: %v. Continuing without cache.", cacheErr) + } else { + cacheDir = dir + } + } + baseOptions := vcsclient.GitCommitsQueryOptions{ + Since: time.Now().AddDate(0, -1*vcc.params.MonthsNum, 0), + ListOptions: vcsclient.ListOptions{ + Page: 1, + PerPage: vcsutils.NumberOfCommitsToFetch, + }, + } + + for _, repo := range repositories { + tasks = append(tasks, repoScanTask{ + vcc: vcc, + repo: repo, + baseOptions: baseOptions, + cacheDir: cacheDir, + cacheValidity: cacheValidity, + idx: len(tasks), + }) + } } + return tasks, nil +} - repoResults := make([]repoScanResult, len(repositories)) - var repoResultsMu sync.Mutex +// runRepoScanTasks scans all repositories using a single parallel runner. +// Errors from scan failures or enqueue failures are collected and returned joined. +// Failed-to-enqueue repos are marked as skipped, consistent with scan failures. +func runRepoScanTasks(tasks []repoScanTask, threads int) ([]repoScanResult, error) { + repoResults := make([]repoScanResult, len(tasks)) + var mu sync.Mutex + var taskErrors []error - threads := cc.params.Threads - if threads <= 0 { - threads = DefaultThreads - } runner := parallel.NewRunner(threads, 20000, false) - for i, repo := range repositories { - if _, addErr := runner.AddTaskWithError(func(threadId int) error { - result := cc.scanRepo(repo, baseOptions, cacheDir, cacheValidity, threadId) - repoResultsMu.Lock() - repoResults[i] = result - repoResultsMu.Unlock() + for _, task := range tasks { + if _, addErr := runner.AddTask(func(threadId int) error { + result := task.vcc.scanRepo(task.repo, task.baseOptions, task.cacheDir, task.cacheValidity, threadId) + mu.Lock() + repoResults[task.idx] = result + if result.skipped { + taskErrors = append(taskErrors, fmt.Errorf("failed to scan repository %q", task.repo)) + } + mu.Unlock() return nil - }, func(taskErr error) { - log.Error("Error scanning repo %q: %v", repo, taskErr) }); addErr != nil { - log.Error("Failed to enqueue repo %q for scanning: %v", repo, addErr) + // Failed to enqueue: mark repo as skipped, same treatment as a scan failure. + mu.Lock() + repoResults[task.idx] = repoScanResult{repo: task.repo, skipped: true} + taskErrors = append(taskErrors, fmt.Errorf("failed to enqueue repository %q: %w", task.repo, addErr)) + mu.Unlock() } } runner.Done() runner.Run() - // Merge all repo results into the server-level result. - sr := vcsServerScanResult{ - uniqueContributors: make(map[BasicContributor]Contributor), - detailedContributors: make(map[string]map[string]ContributorDetailedSummary), - detailedRepos: make(map[string]map[string]RepositoryDetailedSummary), - } - for _, rr := range repoResults { - if rr.repo == "" { - continue - } - if rr.skipped { - sr.skippedRepos = append(sr.skippedRepos, rr.repo) - } else { - sr.scannedRepos = append(sr.scannedRepos, rr.repo) - sr.totalCommits += rr.totalCommits - mergeContributors(sr.uniqueContributors, rr.uniqueContributors) - mergeDetailedContributors(sr.detailedContributors, rr.detailedContributors) - mergeDetailedRepos(sr.detailedRepos, rr.detailedRepos) - } + if len(taskErrors) > 0 { + return repoResults, errors.Join(taskErrors...) } - return sr + return repoResults, nil } // scanRepo scans a single repository (with cache) and returns its result. diff --git a/commands/git/contributors/countcontributors_test.go b/commands/git/contributors/countcontributors_test.go index 113fa0562..9d8b5a9d0 100644 --- a/commands/git/contributors/countcontributors_test.go +++ b/commands/git/contributors/countcontributors_test.go @@ -457,26 +457,51 @@ func TestScanAndCollectCommitsInfo_Parallel(t *testing.T) { BasicGitServerParams: BasicGitServerParams{Owner: "test-owner"}, MonthsNum: 1, Threads: 3, - CacheValidity: 0, // disable cache + CacheValidity: -1, // disable cache }, } - sr := vcc.scanAndCollectCommitsInfo([]string{"repo1", "repo2", "repo3"}) + baseOptions := vcsclient.GitCommitsQueryOptions{ + Since: time.Now().AddDate(0, -1, 0), + ListOptions: vcsclient.ListOptions{Page: 1, PerPage: vcsutils.NumberOfCommitsToFetch}, + } + repos := []string{"repo1", "repo2", "repo3"} + tasks := make([]repoScanTask, len(repos)) + for i, repo := range repos { + tasks[i] = repoScanTask{vcc: vcc, repo: repo, baseOptions: baseOptions, idx: i} + } + + repoResults, err := runRepoScanTasks(tasks, 3) + assert.NoError(t, err) + + // Merge results the same way Run() does. + var scannedRepos, skippedRepos []string + uniqueContributors := make(map[BasicContributor]Contributor) + totalCommits := 0 + for _, rr := range repoResults { + if rr.skipped { + skippedRepos = append(skippedRepos, rr.repo) + } else { + scannedRepos = append(scannedRepos, rr.repo) + totalCommits += rr.totalCommits + mergeContributors(uniqueContributors, rr.uniqueContributors) + } + } // All three repos should be scanned (none skipped). - sort.Strings(sr.scannedRepos) - assert.Equal(t, []string{"repo1", "repo2", "repo3"}, sr.scannedRepos) - assert.Empty(t, sr.skippedRepos) + sort.Strings(scannedRepos) + assert.Equal(t, []string{"repo1", "repo2", "repo3"}, scannedRepos) + assert.Empty(t, skippedRepos) // repo1 has 2 commits, repo2 has 2, repo3 has 0. - assert.Equal(t, 4, sr.totalCommits) + assert.Equal(t, 4, totalCommits) // 4 unique (email, repo) pairs: email1+repo1, email2+repo1, email2+repo2, email3+repo2. - assert.Len(t, sr.uniqueContributors, 4) - assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email1@example.com", Repo: "repo1"}) - assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email2@example.com", Repo: "repo1"}) - assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email2@example.com", Repo: "repo2"}) - assert.Contains(t, sr.uniqueContributors, BasicContributor{Email: "email3@example.com", Repo: "repo2"}) + assert.Len(t, uniqueContributors, 4) + assert.Contains(t, uniqueContributors, BasicContributor{Email: "email1@example.com", Repo: "repo1"}) + assert.Contains(t, uniqueContributors, BasicContributor{Email: "email2@example.com", Repo: "repo1"}) + assert.Contains(t, uniqueContributors, BasicContributor{Email: "email2@example.com", Repo: "repo2"}) + assert.Contains(t, uniqueContributors, BasicContributor{Email: "email3@example.com", Repo: "repo2"}) } // ---- helpers ---- From 24253040c256ea0cd80ee41afb4add9934d39edd Mon Sep 17 00:00:00 2001 From: attiasas Date: Sun, 22 Mar 2026 16:04:43 +0200 Subject: [PATCH 3/3] fix logs --- commands/git/contributors/cache.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/commands/git/contributors/cache.go b/commands/git/contributors/cache.go index b1560a583..6e60be883 100644 --- a/commands/git/contributors/cache.go +++ b/commands/git/contributors/cache.go @@ -65,19 +65,19 @@ func readRepoCache(cacheDir, repo string, maxAge time.Duration) *repoScanResult } var entry repoCacheFile if err = json.Unmarshal(data, &entry); err != nil { - log.Warn("Contributors cache: failed to parse cache file %s: %v", path, err) + log.Warn(fmt.Sprintf("Contributors cache: failed to parse cache file %s: %v", path, err)) return nil } scannedAt, err := time.Parse(time.RFC3339, entry.ScannedAt) if err != nil { - log.Warn("Contributors cache: invalid scanned_at in %s: %v", path, err) + log.Warn(fmt.Sprintf("Contributors cache: invalid scanned_at in %s: %v", path, err)) return nil } if time.Since(scannedAt) > maxAge { - log.Debug("Contributors cache: entry for %q expired (scanned %s ago)", repo, time.Since(scannedAt).Round(time.Second)) + log.Debug(fmt.Sprintf("Contributors cache: entry for %q expired (scanned %s ago)", repo, time.Since(scannedAt).Round(time.Second))) return nil } - log.Debug("Contributors cache: using cached data for repo %q (scanned at %s)", repo, entry.ScannedAt) + log.Debug(fmt.Sprintf("Contributors cache: using cached data for repo %q (scanned at %s)", repo, entry.ScannedAt)) uniqueContributors := make(map[BasicContributor]Contributor, len(entry.UniqueContributors)) for _, e := range entry.UniqueContributors { uniqueContributors[e.Key] = e.Value @@ -110,17 +110,17 @@ func writeRepoCache(cacheDir, repo string, result repoScanResult, months int) { } data, err := json.Marshal(entry) if err != nil { - log.Warn("Contributors cache: failed to marshal cache for repo %q: %v", repo, err) + log.Warn(fmt.Sprintf("Contributors cache: failed to marshal cache for repo %q: %v", repo, err)) return } finalPath := filepath.Join(cacheDir, sanitizeFilename(repo)+".json") tmpPath := finalPath + ".tmp" if err = os.WriteFile(tmpPath, data, 0600); err != nil { - log.Warn("Contributors cache: failed to write tmp file %s: %v", tmpPath, err) + log.Warn(fmt.Sprintf("Contributors cache: failed to write tmp file %s: %v", tmpPath, err)) return } if err = os.Rename(tmpPath, finalPath); err != nil { - log.Warn("Contributors cache: failed to rename %s → %s: %v", tmpPath, finalPath, err) + log.Warn(fmt.Sprintf("Contributors cache: failed to rename %s → %s: %v", tmpPath, finalPath, err)) _ = os.Remove(tmpPath) } }