Skip to content

Commit d7b1783

Browse files
petr-mullerclaude
andcommitted
fix(private-org-sync): process repos in parallel with configurable workers
Extract per-repo sync logic into a syncRepo function and process repos concurrently using a worker pool. The --parallelism flag (default 4) controls the number of concurrent workers. Each worker gets its own copy of the gitSyncer struct, avoiding races on the mutable logger field. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 867f609 commit d7b1783

1 file changed

Lines changed: 119 additions & 82 deletions

File tree

cmd/private-org-sync/main.go

Lines changed: 119 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os/exec"
1010
"path/filepath"
1111
"strings"
12+
"sync"
1213
"time"
1314

1415
"github.com/sirupsen/logrus"
@@ -65,6 +66,7 @@ type options struct {
6566
confirm bool
6667
failOnNonexistentDst bool
6768
debug bool
69+
parallelism int
6870
}
6971

7072
const defaultPrefix = "https://github.com"
@@ -144,6 +146,7 @@ func gatherOptions() options {
144146
fs.BoolVar(&o.failOnNonexistentDst, "fail-on-missing-destination", false, "Set true to make the tool to consider missing sync destination as an error")
145147

146148
fs.BoolVar(&o.debug, "debug", false, "Set true to enable debug logging level")
149+
fs.IntVar(&o.parallelism, "parallelism", 4, "Number of repos to sync in parallel")
147150

148151
o.Options.Bind(fs)
149152
o.WhitelistOptions.Bind(fs)
@@ -362,6 +365,8 @@ type location struct {
362365
org, repo, branch string
363366
}
364367

368+
type repoKey struct{ org, repo string }
369+
365370
func (l location) String() string {
366371
return fmt.Sprintf("%s/%s@%s", l.org, l.repo, l.branch)
367372
}
@@ -675,7 +680,6 @@ func main() {
675680
}
676681

677682
// Group locations by (org, repo) so we can initialize each repo once
678-
type repoKey struct{ org, repo string }
679683
grouped := make(map[repoKey][]location)
680684
for source := range locations {
681685
key := repoKey{org: source.org, repo: source.repo}
@@ -688,108 +692,141 @@ func main() {
688692
flattenedOrgs.Insert(o.org)
689693
}
690694

695+
type repoWork struct {
696+
key repoKey
697+
branches []location
698+
}
699+
work := make(chan repoWork)
700+
var errsMu sync.Mutex
701+
var wg sync.WaitGroup
702+
703+
for i := 0; i < o.parallelism; i++ {
704+
wg.Add(1)
705+
go func() {
706+
defer wg.Done()
707+
for item := range work {
708+
repoErrs := syncRepo(syncer, o.targetOrg, flattenedOrgs, item.key, item.branches)
709+
if len(repoErrs) > 0 {
710+
errsMu.Lock()
711+
errs = append(errs, repoErrs...)
712+
errsMu.Unlock()
713+
}
714+
}
715+
}()
716+
}
717+
691718
for key, branches := range grouped {
692-
repoLogger := logrus.WithFields(logrus.Fields{"org": key.org, "repo": key.repo})
693-
syncer.logger = repoLogger
719+
work <- repoWork{key: key, branches: branches}
720+
}
721+
close(work)
722+
wg.Wait()
694723

695-
dstRepo := key.repo
696-
if !flattenedOrgs.Has(key.org) {
697-
dstRepo = fmt.Sprintf("%s-%s", key.org, key.repo)
698-
}
724+
if len(errs) > 0 {
725+
logrus.WithError(utilerrors.NewAggregate(errs)).Fatal("There were failures")
726+
}
727+
}
699728

700-
gitDir, err := syncer.makeGitDir(key.org, key.repo)
701-
if err != nil {
702-
for _, source := range branches {
703-
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
704-
}
705-
continue
706-
}
729+
func syncRepo(syncer gitSyncer, targetOrg string, flattenedOrgs sets.Set[string], key repoKey, branches []location) []error {
730+
repoLogger := logrus.WithFields(logrus.Fields{"org": key.org, "repo": key.repo})
731+
syncer.logger = repoLogger
707732

708-
if err := syncer.initRepo(gitDir, key.org, key.repo); err != nil {
709-
for _, source := range branches {
710-
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
711-
}
712-
continue
713-
}
733+
var errs []error
714734

715-
// ls-remote source and destination in parallel
716-
destUrlRaw := fmt.Sprintf("%s/%s/%s", syncer.prefix, o.targetOrg, dstRepo)
717-
destUrl, err := url.Parse(destUrlRaw)
718-
if err != nil {
719-
repoLogger.WithField("remote-url", destUrlRaw).WithError(err).Error("Failed to construct URL for the destination remote")
720-
for _, source := range branches {
721-
errs = append(errs, fmt.Errorf("%s: failed to construct URL for the destination remote", source.String()))
722-
}
723-
continue
724-
}
725-
if syncer.token != "" {
726-
destUrl.User = url.User(syncer.token)
735+
dstRepo := key.repo
736+
if !flattenedOrgs.Has(key.org) {
737+
dstRepo = fmt.Sprintf("%s-%s", key.org, key.repo)
738+
}
739+
740+
gitDir, err := syncer.makeGitDir(key.org, key.repo)
741+
if err != nil {
742+
for _, source := range branches {
743+
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
727744
}
745+
return errs
746+
}
728747

729-
srcRemote := fmt.Sprintf("%s-%s", key.org, key.repo)
748+
if err := syncer.initRepo(gitDir, key.org, key.repo); err != nil {
749+
for _, source := range branches {
750+
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
751+
}
752+
return errs
753+
}
730754

731-
type lsRemoteResult struct {
732-
heads RemoteBranchHeads
733-
err error
755+
// ls-remote source and destination in parallel
756+
destUrlRaw := fmt.Sprintf("%s/%s/%s", syncer.prefix, targetOrg, dstRepo)
757+
destUrl, err := url.Parse(destUrlRaw)
758+
if err != nil {
759+
repoLogger.WithField("remote-url", destUrlRaw).WithError(err).Error("Failed to construct URL for the destination remote")
760+
for _, source := range branches {
761+
errs = append(errs, fmt.Errorf("%s: failed to construct URL for the destination remote", source.String()))
734762
}
735-
dstResult := make(chan lsRemoteResult, 1)
736-
srcResult := make(chan lsRemoteResult, 1)
737-
go func() {
738-
heads, err := getRemoteBranchHeads(repoLogger, syncer.git, gitDir, destUrl.String())
739-
dstResult <- lsRemoteResult{heads, err}
740-
}()
741-
go func() {
742-
heads, err := getRemoteBranchHeads(repoLogger, withRetryOnNonzero(syncer.git, 5), gitDir, srcRemote)
743-
srcResult <- lsRemoteResult{heads, err}
744-
}()
763+
return errs
764+
}
765+
if syncer.token != "" {
766+
destUrl.User = url.User(syncer.token)
767+
}
745768

746-
dst := <-dstResult
747-
src := <-srcResult
769+
srcRemote := fmt.Sprintf("%s-%s", key.org, key.repo)
748770

749-
if dst.err != nil {
750-
message := "destination repository does not exist or we cannot access it"
751-
if syncer.failOnNonexistentDst {
752-
repoLogger.Errorf("%s", message)
753-
for _, source := range branches {
754-
errs = append(errs, fmt.Errorf("%s: %s", source.String(), message))
755-
}
756-
} else {
757-
repoLogger.Warn(message)
758-
}
759-
continue
760-
}
771+
type lsRemoteResult struct {
772+
heads RemoteBranchHeads
773+
err error
774+
}
775+
dstResult := make(chan lsRemoteResult, 1)
776+
srcResult := make(chan lsRemoteResult, 1)
777+
go func() {
778+
heads, err := getRemoteBranchHeads(repoLogger, syncer.git, gitDir, destUrl.String())
779+
dstResult <- lsRemoteResult{heads, err}
780+
}()
781+
go func() {
782+
heads, err := getRemoteBranchHeads(repoLogger, withRetryOnNonzero(syncer.git, 5), gitDir, srcRemote)
783+
srcResult <- lsRemoteResult{heads, err}
784+
}()
761785

762-
if src.err != nil {
763-
repoLogger.WithError(src.err).Error("Failed to determine branch HEADs in source")
786+
dst := <-dstResult
787+
src := <-srcResult
788+
789+
if dst.err != nil {
790+
message := "destination repository does not exist or we cannot access it"
791+
if syncer.failOnNonexistentDst {
792+
repoLogger.Errorf("%s", message)
764793
for _, source := range branches {
765-
errs = append(errs, fmt.Errorf("%s: failed to determine branch HEADs in source", source.String()))
794+
errs = append(errs, fmt.Errorf("%s: %s", source.String(), message))
766795
}
767-
continue
796+
} else {
797+
repoLogger.Warn(message)
768798
}
799+
return errs
800+
}
769801

770-
dstHeads := dst.heads
771-
srcHeads := src.heads
772-
802+
if src.err != nil {
803+
repoLogger.WithError(src.err).Error("Failed to determine branch HEADs in source")
773804
for _, source := range branches {
774-
syncer.logger = config.LoggerForInfo(config.Info{
775-
Metadata: api.Metadata{
776-
Org: source.org,
777-
Repo: source.repo,
778-
Branch: source.branch,
779-
},
780-
})
781-
782-
destination := location{org: o.targetOrg, repo: dstRepo, branch: source.branch}
783-
784-
if err := syncer.mirror(gitDir, source, destination, srcHeads, dstHeads, destUrl); err != nil {
785-
errs = append(errs, fmt.Errorf("%s->%s: %w", source.String(), destination.String(), err))
786-
}
805+
errs = append(errs, fmt.Errorf("%s: failed to determine branch HEADs in source", source.String()))
787806
}
807+
return errs
788808
}
789809

790-
if len(errs) > 0 {
791-
logrus.WithError(utilerrors.NewAggregate(errs)).Fatal("There were failures")
810+
dstHeads := dst.heads
811+
srcHeads := src.heads
812+
813+
for _, source := range branches {
814+
syncer.logger = config.LoggerForInfo(config.Info{
815+
Metadata: api.Metadata{
816+
Org: source.org,
817+
Repo: source.repo,
818+
Branch: source.branch,
819+
},
820+
})
821+
822+
destination := location{org: targetOrg, repo: dstRepo, branch: source.branch}
823+
824+
if err := syncer.mirror(gitDir, source, destination, srcHeads, dstHeads, destUrl); err != nil {
825+
errs = append(errs, fmt.Errorf("%s->%s: %w", source.String(), destination.String(), err))
826+
}
792827
}
828+
829+
return errs
793830
}
794831

795832
func getWhitelistedLocations(whitelist map[string][]string, git gitFunc, prefix, token string) (map[location]struct{}, []error) {

0 commit comments

Comments
 (0)