diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d4c32b7f2..c0d51e56a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -137,7 +137,7 @@ make docker-unit-tests In order to run aptly system tests, enter the following: ``` -make docker-system-tests +make docker-system-test ``` #### Running golangci-lint diff --git a/Makefile b/Makefile index 770d9d9dc..96f44f4a0 100644 --- a/Makefile +++ b/Makefile @@ -182,6 +182,9 @@ binaries: prepare swagger ## Build binary releases (FreeBSD, macOS, Linux gener docker-image: ## Build aptly-dev docker image @docker build -f system/Dockerfile . -t aptly-dev +docker-image-test: # Build aptly-test docker image for testing + @docker build -f docker/test.Dockerfile . -t aptly-test + docker-image-no-cache: ## Build aptly-dev docker image (no cache) @docker build --no-cache -f system/Dockerfile . -t aptly-dev @@ -241,4 +244,4 @@ clean: ## remove local build and module cache rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true -.PHONY: help man prepare swagger version binaries build docker-release docker-system-tests docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8 +.PHONY: help man prepare swagger version binaries build docker-release docker-system-test docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8 diff --git a/api/mirror.go b/api/mirror.go index 743101fc9..afc7d0079 100644 --- a/api/mirror.go +++ b/api/mirror.go @@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) { name := c.Params.ByName("name") force := c.Request.URL.Query().Get("force") == "1" + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() mirrorCollection := collectionFactory.RemoteRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() repo, err := mirrorCollection.ByName(name) if err != nil { @@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete mirror %s", name) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := repo.CheckLock() + // Phase 2: Inside task lock - create fresh collections + taskCollectionFactory := context.NewCollectionFactory() + taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load after lock acquired + repo, err := taskMirrorCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) + } + + err = repo.CheckLock() if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) } if !force { - snapshots := snapshotCollection.ByRemoteRepoSource(repo) + // Fresh checks with current collections + snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override") } } - err = mirrorCollection.Drop(repo) + err = taskMirrorCollection.Drop(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) } @@ -535,7 +548,8 @@ func apiMirrorsUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.RemoteRepoCollection() - remote, err = collection.ByName(c.Params.ByName("name")) + name := c.Params.ByName("name") + remote, err = collection.ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -550,6 +564,7 @@ func apiMirrorsUpdate(c *gin.Context) { return } + // Pre-task validation of new name if provided if b.Name != remote.Name { _, err = collection.ByName(b.Name) if err == nil { @@ -566,9 +581,26 @@ func apiMirrorsUpdate(c *gin.Context) { resources := []string{string(remote.Key())} maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.RemoteRepoCollection() + + // Fresh load after lock acquired (use captured `name` variable, not gin context) + remote, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + // Fresh rename check inside lock (if renaming) + if b.Name != remote.Name { + _, err := taskCollection.ByName(b.Name) + if err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name) + } + } downloader := context.NewDownloader(out) - err := remote.Fetch(downloader, verifier, b.IgnoreSignatures) + err = remote.Fetch(downloader, verifier, b.IgnoreSignatures) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -580,14 +612,14 @@ func apiMirrorsUpdate(c *gin.Context) { } } - err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck) + err = remote.DownloadPackageIndexes(out, downloader, verifier, taskCollectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } if remote.DownloadAppStream && !remote.IsFlat() { err = remote.DownloadAppStreamFiles(out, downloader, - context.PackagePool(), collectionFactory.ChecksumCollection(nil), b.IgnoreChecksums) + context.PackagePool(), taskCollectionFactory.ChecksumCollection(nil), b.IgnoreChecksums) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -607,8 +639,8 @@ func apiMirrorsUpdate(c *gin.Context) { } } - queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(), - collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly) + queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), taskCollectionFactory.PackageCollection(), + taskCollectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -618,12 +650,12 @@ func apiMirrorsUpdate(c *gin.Context) { e := context.ReOpenDatabase() if e == nil { remote.MarkAsIdle() - _ = collection.Update(remote) + _ = taskCollection.Update(remote) } }() remote.MarkAsUpdating() - err = collection.Update(remote) + err = taskCollection.Update(remote) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -727,7 +759,7 @@ func apiMirrorsUpdate(c *gin.Context) { } // and import it back to the pool - task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil)) + task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, taskCollectionFactory.ChecksumCollection(nil)) if err != nil { //return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import file: %s", err) pushError(err) @@ -780,8 +812,8 @@ func apiMirrorsUpdate(c *gin.Context) { } log.Info().Msgf("%s: Finalizing download...", b.Name) - _ = remote.FinalizeDownload(collectionFactory, out) - err = collectionFactory.RemoteRepoCollection().Update(remote) + _ = remote.FinalizeDownload(taskCollectionFactory, out) + err = taskCollection.Update(remote) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } diff --git a/api/publish.go b/api/publish.go index 67b260d47..2a6b0c0ba 100644 --- a/api/publish.go +++ b/api/publish.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "log" "net/http" "strings" @@ -298,11 +299,25 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { multiDist = *b.MultiDist } - collection := collectionFactory.PublishedRepoCollection() + // Pre-register the published repo key in resources so that concurrent + // POST requests for the same prefix/distribution are serialized by the + // task queue rather than racing on CheckDuplicate + Add. + if b.Distribution != "" { + storagePrefix := prefix + if storage != "" { + storagePrefix = storage + ":" + prefix + } + resources = append(resources, "U"+storagePrefix+">>"+b.Distribution) + } else { + log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix) + } taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + taskDetail := task.PublishDetail{ Detail: detail, } @@ -314,10 +329,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range sources { switch s := source.(type) { case *deb.Snapshot: - snapshotCollection := collectionFactory.SnapshotCollection() + snapshotCollection := taskCollectionFactory.SnapshotCollection() err = snapshotCollection.LoadComplete(s) case *deb.LocalRepo: - localCollection := collectionFactory.LocalRepoCollection() + localCollection := taskCollectionFactory.LocalRepoCollection() err = localCollection.LoadComplete(s) default: err = fmt.Errorf("unexpected type for source: %T", source) @@ -327,13 +342,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { } } - published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) + published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - resources = append(resources, string(published.Key())) - if b.Origin != "" { published.Origin = b.Origin } @@ -367,18 +380,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { published.Version = b.Version } - duplicate := collection.CheckDuplicate(published) + duplicate := taskCollection.CheckDuplicate(published) if duplicate != nil { - _ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) + _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - err = collection.Add(published) + err = taskCollection.Add(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -458,6 +471,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { @@ -465,64 +479,78 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } + resources := []string{string(published.Key())} + if published.SourceKind == deb.SourceLocalRepo { if len(b.Snapshots) > 0 { AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) return } + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } } else if published.SourceKind == deb.SourceSnapshot { for _, snapshotInfo := range b.Snapshots { - _, err2 := snapshotCollection.ByName(snapshotInfo.Name) + snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name) if err2 != nil { AbortWithJSONError(c, http.StatusNotFound, err2) return } + resources = append(resources, string(snapshot.ResourceKey())) } } else { AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) return } - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - - resources := []string{string(published.Key())} + // Field mutations and fresh DB load are deferred to inside the task so + // they always operate on a consistent state after the lock is held. taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(published, collectionFactory) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + revision := published.ObtainRevision() sources := revision.Sources @@ -534,17 +562,17 @@ func apiPublishUpdateSwitch(c *gin.Context) { } } - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -552,7 +580,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -602,8 +630,11 @@ func apiPublishDrop(c *gin.Context) { resources := []string{string(published.Key())} taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.Remove(context, storage, prefix, distribution, - collectionFactory, out, force, skipCleanup) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + err := taskCollection.Remove(context, storage, prefix, distribution, + taskCollectionFactory, out, force, skipCleanup) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) } @@ -639,43 +670,52 @@ func apiPublishAddSource(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly (no LoadComplete) to verify existence and obtain the + // resource key and task name. The actual mutation is performed inside + // the task on a freshly loaded copy to prevent lost-update races. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if c.Bind(&b) != nil { - return - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err) + } - revision := published.ObtainRevision() - sources := revision.Sources + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err) + } - component := b.Component - name := b.Name + revision := published.ObtainRevision() + sources := revision.Sources - _, exists := sources[component] - if exists { - AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component)) - return - } + component := b.Component + name := b.Name - sources[component] = name + _, exists := sources[component] + if exists { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component) + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + sources[component] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -757,39 +797,48 @@ func apiPublishSetSources(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if c.Bind(&b) != nil { - return - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - revision := published.ObtainRevision() - sources := make(map[string]string, len(b)) - revision.Sources = sources + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - for _, source := range b { - component := source.Component - name := source.Name - sources[component] = name - } + revision := published.ObtainRevision() + sources := make(map[string]string, len(b)) + revision.Sources = sources - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + for _, source := range b { + component := source.Component + name := source.Name + sources[component] = name + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -822,24 +871,33 @@ func apiPublishDropChanges(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and DropRevision happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - published.DropRevision() - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + published.DropRevision() + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -875,51 +933,58 @@ func apiPublishUpdateSource(c *gin.Context) { param := slashEscape(c.Params.ByName("prefix")) storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) - component := slashEscape(c.Params.ByName("component")) + urlComponent := slashEscape(c.Params.ByName("component")) + + // Default component to the URL path segment; the body may rename it. + b.Component = urlComponent + if c.Bind(&b) != nil { + return + } collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - revision := published.ObtainRevision() - sources := revision.Sources + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component)) - return - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - b.Component = component - b.Name = revision.Sources[component] + revision := published.ObtainRevision() + sources := revision.Sources - if c.Bind(&b) != nil { - return - } + _, exists := sources[urlComponent] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent) + } - if b.Component != component { - delete(sources, component) - } + if b.Component != urlComponent { + delete(sources, urlComponent) + } - component = b.Component - name := b.Name - sources[component] = name + newComponent := b.Component + name := b.Name + sources[newComponent] = name - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -956,33 +1021,41 @@ func apiPublishRemoveSource(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } + resources := []string{string(published.Key())} + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - revision := published.ObtainRevision() - sources := revision.Sources + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component)) - return - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } - delete(sources, component) + revision := published.ObtainRevision() + sources := revision.Sources - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + _, exists := sources[component] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component) + } + + delete(sources, component) + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1054,64 +1127,94 @@ func apiPublishUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and field mutations happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } + resources := []string{string(published.Key())} - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } + // Lock source repos / snapshots the same way apiPublishUpdateSwitch does, + // because published.Update() reads from them and concurrent modification + // would produce an inconsistent view. + snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy + if published.SourceKind == deb.SourceLocalRepo { + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, uuid := range published.Sources { + snapshot, err2 := snapshotCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.ResourceKey())) + } } - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() - if b.Label != nil { - published.Label = *b.Label - } + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - if b.Origin != nil { - published.Origin = *b.Origin - } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } - if b.Version != nil { - published.Version = *b.Version - } + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } - resources := []string{string(published.Key())} - taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1119,7 +1222,7 @@ func apiPublishUpdate(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } diff --git a/api/repos.go b/api/repos.go index 1d6dfae3a..04e6f191e 100644 --- a/api/repos.go +++ b/api/repos.go @@ -131,46 +131,69 @@ func apiReposCreate(c *gin.Context) { return } - repo := deb.NewLocalRepo(b.Name, b.Comment) - repo.DefaultComponent = b.DefaultComponent - repo.DefaultDistribution = b.DefaultDistribution - + // Handler: Pre-task validations (shallow) collectionFactory := context.NewCollectionFactory() if b.FromSnapshot != "" { - var snapshot *deb.Snapshot - snapshotCollection := collectionFactory.SnapshotCollection() - snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + _, err := snapshotCollection.ByName(b.FromSnapshot) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err)) return } + // Just verify it exists - don't load here + } - err = snapshotCollection.LoadComplete(snapshot) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err)) - return + // Use generated key resource for repo being created + resources := []string{"LocalRepo:" + b.Name} + if b.FromSnapshot != "" { + resources = append(resources, "Snapshot:"+b.FromSnapshot) + } + + taskName := fmt.Sprintf("Create repository %s", b.Name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection and check/create ATOMIC inside task + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Check duplicate inside lock + if _, err := taskCollection.ByName(b.Name); err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %s already exists", b.Name) } - repo.UpdateRefList(snapshot.RefList()) - } + // Create repo + repo := deb.NewLocalRepo(b.Name, b.Comment) + repo.DefaultComponent = b.DefaultComponent + repo.DefaultDistribution = b.DefaultDistribution - localRepoCollection := collectionFactory.LocalRepoCollection() + if b.FromSnapshot != "" { + snapshotCollection := taskCollectionFactory.SnapshotCollection() - if _, err := localRepoCollection.ByName(b.Name); err == nil { - AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name)) - return - } + snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, + fmt.Errorf("source snapshot not found: %s", err) + } - err := localRepoCollection.Add(repo) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, err) - return - } + err = snapshotCollection.LoadComplete(snapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, + fmt.Errorf("unable to load source snapshot: %s", err) + } + + repo.UpdateRefList(snapshot.RefList()) + } + + err := taskCollection.Add(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } - c.JSON(http.StatusCreated, repo) + return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil + }) } type reposEditParams struct { @@ -201,6 +224,8 @@ func apiReposEdit(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Mutation and duplicate check happen inside the task for atomicity. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -211,32 +236,47 @@ func apiReposEdit(c *gin.Context) { return } - if b.Name != nil && *b.Name != name { - _, err := collection.ByName(*b.Name) - if err == nil { - // already exists - AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name)) - return + resources := []string{string(repo.Key())} + taskName := fmt.Sprintf("Edit repository %s", name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err } - repo.Name = *b.Name - } - if b.Comment != nil { - repo.Comment = *b.Comment - } - if b.DefaultDistribution != nil { - repo.DefaultDistribution = *b.DefaultDistribution - } - if b.DefaultComponent != nil { - repo.DefaultComponent = *b.DefaultComponent - } - err = collection.Update(repo) - if err != nil { - AbortWithJSONError(c, 500, err) - return - } + // Check and update ATOMIC (inside lock) + if b.Name != nil && *b.Name != name { + _, err := taskCollection.ByName(*b.Name) + if err == nil { + // already exists + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %q already exists", *b.Name) + } + repo.Name = *b.Name + } + if b.Comment != nil { + repo.Comment = *b.Comment + } + if b.DefaultDistribution != nil { + repo.DefaultDistribution = *b.DefaultDistribution + } + if b.DefaultComponent != nil { + repo.DefaultComponent = *b.DefaultComponent + } - c.JSON(200, repo) + err = taskCollection.Update(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil + }) } // GET /api/repos/:name @@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) { force := c.Request.URL.Query().Get("force") == "1" name := c.Params.ByName("name") + // Load shallowly for 404 check, resource key, and task name. + // Full checks (published/snapshots) happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() repo, err := collection.ByName(name) if err != nil { @@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.ByLocalRepo(repo) + // Task: Create fresh collections inside task after lock acquired + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Re-read repo with fresh collection after lock + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err) + } + + // Check with fresh collections + published := taskPublishedCollection.ByLocalRepo(repo) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published") } if !force { - snapshots := snapshotCollection.ByLocalRepoSource(repo) + snapshots := taskSnapshotCollection.ByLocalRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") } } - return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo) + return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo) }) } @@ -361,10 +414,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - repo, err := collection.ByName(c.Params.ByName("name")) + name := c.Params.ByName("name") + repo, err := collection.ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -373,13 +429,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li resources := []string{string(repo.Key())} maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired (use captured `name` variable, not gin context) + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } out.Printf("Loading packages...\n") - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -388,7 +454,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li for _, ref := range b.PackageRefs { var p *deb.Package - p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) + p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err) @@ -404,7 +470,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -511,6 +577,8 @@ func apiReposPackageFromDir(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -534,7 +602,17 @@ func apiReposPackageFromDir(c *gin.Context) { resources := []string{string(repo.Key())} resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -555,13 +633,13 @@ func apiReposPackageFromDir(c *gin.Context) { packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err) } processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), - collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection) + taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection) failedFiles = append(failedFiles, failedFiles2...) processedFiles = append(processedFiles, otherFiles...) @@ -571,7 +649,7 @@ func apiReposPackageFromDir(c *gin.Context) { repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -650,6 +728,8 @@ func apiReposCopyPackage(c *gin.Context) { return } + // Load shallowly for 404 check and resource keys. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { @@ -673,12 +753,26 @@ func apiReposCopyPackage(c *gin.Context) { resources := []string{string(dstRepo.Key()), string(srcRepo.Key())} maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + // Task: Create fresh factory and collections inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of both repos after lock acquired + dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) } - err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo) + srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) } @@ -691,12 +785,12 @@ func apiReposCopyPackage(c *gin.Context) { RemovedLines: []string{}, } - dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress()) + dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err) } - srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress()) + srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err) } @@ -764,7 +858,7 @@ func apiReposCopyPackage(c *gin.Context) { } else { dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList)) - err = collectionFactory.LocalRepoCollection().Update(dstRepo) + err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -867,6 +961,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) { resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + var ( err error verifier = context.GetVerifier() @@ -882,8 +979,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) { changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) _, failedFiles2, err = deb.ImportChangesFiles( changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, - repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), - context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse) + repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(), + context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse) failedFiles = append(failedFiles, failedFiles2...) if err != nil { diff --git a/api/snapshot.go b/api/snapshot.go index 2c50566fe..a9061eb18 100644 --- a/api/snapshot.go +++ b/api/snapshot.go @@ -83,11 +83,9 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { } collectionFactory := context.NewCollectionFactory() - collection := collectionFactory.RemoteRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") - repo, err = collection.ByName(name) + repo, err = collectionFactory.RemoteRepoCollection().ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -97,12 +95,21 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { resources := []string{string(repo.Key()), "S" + b.Name} taskName := fmt.Sprintf("Create snapshot of mirror %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := repo.CheckLock() + taskCollectionFactory := context.NewCollectionFactory() + taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + repo, err := taskMirrorCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = repo.CheckLock() if err != nil { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err } - err = collection.LoadComplete(repo) + err = taskMirrorCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -116,7 +123,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { snapshot.Description = b.Description } - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -165,6 +172,7 @@ func apiSnapshotsCreate(c *gin.Context) { } } + // Phase 1: Pre-task validation (shallow load for 404 checks only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() var resources []string @@ -182,8 +190,20 @@ func apiSnapshotsCreate(c *gin.Context) { } maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - for i := range sources { - err = snapshotCollection.LoadComplete(sources[i]) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPackageCollection := taskCollectionFactory.PackageCollection() + + // Fresh load of all sources after lock acquired + freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots)) + for i := range b.SourceSnapshots { + freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i]) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + // LoadComplete on fresh copy + err = taskSnapshotCollection.LoadComplete(freshSources[i]) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -191,9 +211,9 @@ func apiSnapshotsCreate(c *gin.Context) { list := deb.NewPackageList() - // verify package refs and build package list + // verify package refs and build package list using fresh factory for _, ref := range b.PackageRefs { - p, err := collectionFactory.PackageCollection().ByKey([]byte(ref)) + p, err := taskPackageCollection.ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err) @@ -206,9 +226,9 @@ func apiSnapshotsCreate(c *gin.Context) { } } - snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description) + snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description) - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -249,11 +269,9 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { } collectionFactory := context.NewCollectionFactory() - collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") - repo, err = collection.ByName(name) + repo, err = collectionFactory.LocalRepoCollection().ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -263,7 +281,16 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { resources := []string{string(repo.Key()), "S" + b.Name} taskName := fmt.Sprintf("Create snapshot of repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.LoadComplete(repo) + taskCollectionFactory := context.NewCollectionFactory() + taskRepoCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + repo, err := taskRepoCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskRepoCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -277,7 +304,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { snapshot.Description = b.Description } - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -315,6 +342,7 @@ func apiSnapshotsUpdate(c *gin.Context) { return } + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() collection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") @@ -325,14 +353,38 @@ func apiSnapshotsUpdate(c *gin.Context) { return } + // Pre-task validation of new name if provided (skip if renaming to same name) + if b.Name != "" && b.Name != name { + _, err = collection.ByName(b.Name) + if err == nil { + AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)) + return + } + } + resources := []string{string(snapshot.ResourceKey()), "S" + b.Name} taskName := fmt.Sprintf("Update snapshot %s", name) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - _, err := collection.ByName(b.Name) - if err == nil { - return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load after lock acquired + snapshot, err = taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + // Fresh duplicate check inside lock + if b.Name != "" { + _, err := taskCollection.ByName(b.Name) + if err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + } } + // Update fresh copy if b.Name != "" { snapshot.Name = b.Name } @@ -341,7 +393,7 @@ func apiSnapshotsUpdate(c *gin.Context) { snapshot.Description = b.Description } - err = collectionFactory.SnapshotCollection().Update(snapshot) + err = taskCollection.Update(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -395,9 +447,9 @@ func apiSnapshotsDrop(c *gin.Context) { name := c.Params.ByName("name") force := c.Request.URL.Query().Get("force") == "1" + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() snapshot, err := snapshotCollection.ByName(name) if err != nil { @@ -407,21 +459,35 @@ func apiSnapshotsDrop(c *gin.Context) { resources := []string{string(snapshot.ResourceKey())} taskName := fmt.Sprintf("Delete snapshot %s", name) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.BySnapshot(snapshot) + // Phase 2: Inside task lock - create fresh collections + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Fresh load after lock acquired + snapshot, err := taskSnapshotCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + // Fresh checks with current collections + published := taskPublishedCollection.BySnapshot(snapshot) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published") } if !force { - snapshots := snapshotCollection.BySnapshotSource(snapshot) + // Using fresh collection for dependency check + snapshots := taskSnapshotCollection.BySnapshotSource(snapshot) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override") } } - err = snapshotCollection.Drop(snapshot) + err = taskSnapshotCollection.Drop(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -576,6 +642,7 @@ func apiSnapshotsMerge(c *gin.Context) { return } + // Phase 1: Pre-task validation (shallow load for 404 checks only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() @@ -592,32 +659,43 @@ func apiSnapshotsMerge(c *gin.Context) { } maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = snapshotCollection.LoadComplete(sources[0]) - if err != nil { - return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err - } - result := sources[0].RefList() - for i := 1; i < len(sources); i++ { - err = snapshotCollection.LoadComplete(sources[i]) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load of all sources inside task + freshSources := make([]*deb.Snapshot, len(body.Sources)) + for i := range body.Sources { + freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i]) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + // LoadComplete on fresh copy + err = taskSnapshotCollection.LoadComplete(freshSources[i]) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - result = result.Merge(sources[i].RefList(), overrideMatching, false) + } + + // Merge using fresh sources + result := freshSources[0].RefList() + for i := 1; i < len(freshSources); i++ { + result = result.Merge(freshSources[i].RefList(), overrideMatching, false) } if latest { result.FilterLatestRefs() } - sourceDescription := make([]string, len(sources)) - for i, s := range sources { + sourceDescription := make([]string, len(freshSources)) + for i, s := range freshSources { sourceDescription[i] = fmt.Sprintf("'%s'", s.Name) } - snapshot = deb.NewSnapshotFromRefList(name, sources, result, + snapshot = deb.NewSnapshotFromRefList(name, freshSources, result, fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", "))) - err = collectionFactory.SnapshotCollection().Add(snapshot) + err = taskCollectionFactory.SnapshotCollection().Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err) } @@ -701,21 +779,29 @@ func apiSnapshotsPull(c *gin.Context) { resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())} taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of snapshots after lock acquired + freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot) + err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } // convert snapshots to package list - toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress()) + toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress()) + sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -812,10 +898,10 @@ func apiSnapshotsPull(c *gin.Context) { } // Create snapshot - destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList, - fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", "))) + destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList, + fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", "))) - err = collectionFactory.SnapshotCollection().Add(destinationSnapshot) + err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } diff --git a/docker/test.Dockerfile b/docker/test.Dockerfile new file mode 100644 index 000000000..4ba65782c --- /dev/null +++ b/docker/test.Dockerfile @@ -0,0 +1,7 @@ +FROM aptly-dev + +ADD --chown=aptly:aptly . /work/src/ + +# Pre-populate the Go module cache so go mod verify works offline +RUN chown aptly /work/src && mkdir -p /work/src/.go && chown aptly /work/src/.go && \ + cd /work/src && sudo -u aptly GOPATH=/work/src/.go GOCACHE=/work/src/.go/cache go mod download diff --git a/system/t12_api/repos.py b/system/t12_api/repos.py index 424f9f49f..6448a5576 100644 --- a/system/t12_api/repos.py +++ b/system/t12_api/repos.py @@ -461,3 +461,34 @@ def check(self): self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378']) + + +class ReposAPITestCreateEdit(APITest): + """ + POST /api/repos, + """ + def check(self): + repo_name = self.random_name() + ' with space' + repo_desc = {'Comment': 'fun repo', + 'DefaultComponent': 'contrib', + 'DefaultDistribution': 'bookworm', + 'Name': repo_name} + + resp = self.post("/api/repos", json=repo_desc) + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 201) + + repo_desc = {'Comment': 'modified repo', + 'DefaultComponent': 'main', + 'DefaultDistribution': 'trixie', + 'Name': repo_name + '@renamed'} + resp = self.put(f"/api/repos/{repo_name}", json=repo_desc) + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 200) + + resp = self.get("/api/repos/" + repo_name + '@renamed') + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 200) + + resp = self.delete("/api/repos/" + repo_name + '@renamed') + self.check_equal(resp.status_code, 200) diff --git a/task/list.go b/task/list.go index 5b9e9395e..4af459cdf 100644 --- a/task/list.go +++ b/task/list.go @@ -44,25 +44,27 @@ func (list *List) consumer() { for { select { case task := <-list.queue: + // Set task state to RUNNING before processing list.Lock() - { - task.State = RUNNING - } + task.State = RUNNING list.Unlock() go func() { retValue, err := task.process(aptly.Progress(task.output), task.detail) + // Update task completion state and cleanup with list lock held list.Lock() { - task.processReturnValue = retValue - task.err = err if err != nil { task.output.Printf("Task failed with error: %v", err) task.State = FAILED + task.err = err + task.processReturnValue = retValue } else { task.output.Print("Task succeeded") task.State = SUCCEEDED + task.err = nil + task.processReturnValue = retValue } list.usedResources.Free(task.resources) @@ -105,13 +107,15 @@ func (list *List) Stop() { // GetTasks gets complete list of tasks func (list *List) GetTasks() []Task { - tasks := []Task{} list.Lock() + defer list.Unlock() + + tasks := []Task{} for _, task := range list.tasks { + // Copy task while holding list lock tasks = append(tasks, *task) } - list.Unlock() return tasks } @@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) { // GetTaskByID returns task with given id func (list *List) GetTaskByID(ID int) (Task, error) { list.Lock() - tasks := list.tasks - list.Unlock() + defer list.Unlock() - for _, task := range tasks { + for _, task := range list.tasks { if task.ID == ID { + // Copy task while holding list lock return *task, nil } } @@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) { // GetTaskReturnValueByID returns process return value of task with given id func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.processReturnValue, nil + } } - return task.processReturnValue, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } // RunTaskInBackground creates task and runs it in background. This will block until the necessary resources @@ -204,6 +211,10 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.wg.Add(1) task.wgTask.Add(1) + // Copy task while still holding the lock to avoid racing with consumer + // setting State=RUNNING after receiving from queue + taskCopy := *task + // add task to queue for processing if resources are available // if not, task will be queued by the consumer once resources are available tasks := list.usedResources.UsedBy(resources) @@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.Unlock() } - return *task, nil + return taskCopy, nil } // Clear removes finished tasks from list func (list *List) Clear() { list.Lock() + defer list.Unlock() var tasks []*Task for _, task := range list.tasks { @@ -230,8 +242,6 @@ func (list *List) Clear() { } } list.tasks = tasks - - list.Unlock() } // Wait waits till all tasks are processed @@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) { // GetTaskErrorByID returns the Task error for a given id func (list *List) GetTaskErrorByID(ID int) (error, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.err, nil + } } - return task.err, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } diff --git a/task/task.go b/task/task.go index 02aa7037b..04829675b 100644 --- a/task/task.go +++ b/task/task.go @@ -42,6 +42,7 @@ const ( ) // Task represents as task in a queue encapsulates process code +// All fields are protected by List.Mutex - access task fields only while holding list.Lock() type Task struct { output *Output detail *Detail