Skip to content

Commit

Permalink
add concurrency limits for tag lookup and untag
Browse files Browse the repository at this point in the history
Harbor is using the distribution for it's (harbor-registry) registry component.
The harbor GC will call into the registry to delete the manifest, which in turn
then does a lookup for all tags that reference the deleted manifest.
To find the tag references, the registry will iterate every tag in the repository
and read it's link file to check if it matches the deleted manifest (i.e. to see
if uses the same sha256 digest). So, the more tags in repository, the worse the
performance will be (as there will be more s3 API calls occurring for the tag
directory lookups and tag file reads).

Therefore, we can use concurrent lookup and untag to optimize performance as described in goharbor/harbor#12948.

P.S. This optimization was originally contributed by @Antiarchitect, now I would like to take it over.
Thanks @Antiarchitect's efforts with PR distribution#3890.

Signed-off-by: Liang Zheng <[email protected]>
  • Loading branch information
microyahoo authored and thcdrt committed Feb 13, 2025
1 parent 33439e7 commit 3c718b1
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 23 deletions.
2 changes: 2 additions & 0 deletions cmd/registry/config-cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ storage:
maintenance:
uploadpurging:
enabled: false
tag:
concurrencylimit: 8
http:
addr: :5000
secret: asecretforlocaldevelopment
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry/config-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ storage:
maintenance:
uploadpurging:
enabled: false
tag:
concurrencylimit: 8
http:
addr: :5000
debug:
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry/config-example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ storage:
blobdescriptor: inmemory
filesystem:
rootdirectory: /var/lib/registry
tag:
concurrencylimit: 8
http:
addr: :5000
headers:
Expand Down
17 changes: 17 additions & 0 deletions configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ func (storage Storage) Type() string {
// allow configuration of delete
case "redirect":
// allow configuration of redirect
case "tag":
// allow configuration of tag
default:
storageType = append(storageType, k)
}
Expand All @@ -446,6 +448,19 @@ func (storage Storage) Type() string {
return ""
}

// TagParameters returns the Parameters map for a Storage tag configuration
func (storage Storage) TagParameters() Parameters {
return storage["tag"]
}

// setTagParameter changes the parameter at the provided key to the new value
func (storage Storage) setTagParameter(key string, value interface{}) {
if _, ok := storage["tag"]; !ok {
storage["tag"] = make(Parameters)
}
storage["tag"][key] = value
}

// Parameters returns the Parameters map for a Storage configuration
func (storage Storage) Parameters() Parameters {
return storage[storage.Type()]
Expand Down Expand Up @@ -474,6 +489,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error {
// allow configuration of delete
case "redirect":
// allow configuration of redirect
case "tag":
// allow configuration of tag
default:
types = append(types, k)
}
Expand Down
10 changes: 10 additions & 0 deletions configuration/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ var configStruct = Configuration{
"host": nil,
"port": 42,
},
"tag": Parameters{
"concurrencylimit": 10,
},
},
Auth: Auth{
"silly": Parameters{
Expand Down Expand Up @@ -149,6 +152,8 @@ storage:
secretkey: SUPERSECRET
host: ~
port: 42
tag:
concurrencylimit: 10
auth:
silly:
realm: silly
Expand Down Expand Up @@ -537,6 +542,11 @@ func copyConfig(config Configuration) *Configuration {
for k, v := range config.Storage.Parameters() {
configCopy.Storage.setParameter(k, v)
}

for k, v := range config.Storage.TagParameters() {
configCopy.Storage.setTagParameter(k, v)
}

configCopy.Reporting = Reporting{
Bugsnag: BugsnagReporting{config.Reporting.Bugsnag.APIKey, config.Reporting.Bugsnag.ReleaseStage, config.Reporting.Bugsnag.Endpoint},
NewRelic: NewRelicReporting{config.Reporting.NewRelic.LicenseKey, config.Reporting.NewRelic.Name, config.Reporting.NewRelic.Verbose},
Expand Down
27 changes: 27 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ storage:
chunksize: optional size valye
rootdirectory: optional root directory
inmemory: # This driver takes no parameters
tag:
concurrencylimit: 8
delete:
enabled: false
redirect:
Expand Down Expand Up @@ -554,6 +556,31 @@ layer metadata.
> **NOTE**: Formerly, `blobdescriptor` was known as `layerinfo`. While these
> are equivalent, `layerinfo` has been deprecated.
If `blobdescriptor` is set to `inmemory`, the optional `blobdescriptorsize`
parameter sets a limit on the number of descriptors to store in the cache.
The default value is 10000. If this parameter is set to 0, the cache is allowed
to grow with no size limit.

### `tag`

The `tag` subsection provides configuration to set concurrency limit for tag lookup.
When user calls into the registry to delete the manifest, which in turn then does a
lookup for all tags that reference the deleted manifest. To find the tag references,
the registry will iterate every tag in the repository and read it's link file to check
if it matches the deleted manifest (i.e. to see if uses the same sha256 digest).
So, the more tags in repository, the worse the performance will be (as there will
be more S3 API calls occurring for the tag directory lookups and tag file reads if
using S3 storage driver).

Therefore, add a single flag `concurrencylimit` to set concurrency limit to optimize tag
lookup performance under the `tag` section. When a value is not provided or equal to 0,
`GOMAXPROCS` will be used.

```yaml
tag:
concurrencylimit: 8
```
### `redirect`

The `redirect` subsection provides configuration for managing redirects from
Expand Down
15 changes: 15 additions & 0 deletions registry/handlers/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
}
}

// configure tag lookup concurrency limit
if p := config.Storage.TagParameters(); p != nil {
l, ok := p["concurrencylimit"]
if ok {
limit, ok := l.(int)
if !ok {
panic("tag lookup concurrency limit config key must have a integer value")
}
if limit < 0 {
panic("tag lookup concurrency limit should be a non-negative integer value")
}
options = append(options, storage.TagLookupConcurrencyLimit(limit))
}
}

// configure redirects
var redirectDisabled bool
if redirectConfig, ok := config.Storage["redirect"]; ok {
Expand Down
25 changes: 21 additions & 4 deletions registry/handlers/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"

"github.com/docker/distribution"
dcontext "github.com/docker/distribution/context"
Expand All @@ -16,9 +17,11 @@ import (
"github.com/docker/distribution/registry/api/errcode"
v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/auth"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
)

// These constants determine which architecture and OS to choose from a
Expand Down Expand Up @@ -520,12 +523,26 @@ func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Reques
return
}

var (
errs []error
mu sync.Mutex
)
g := errgroup.Group{}
g.SetLimit(storage.DefaultConcurrencyLimit)
for _, tag := range referencedTags {
if err := tagService.Untag(imh, tag); err != nil {
imh.Errors = append(imh.Errors, err)
return
}
tag := tag

g.Go(func() error {
if err := tagService.Untag(imh, tag); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
return nil
})
}
_ = g.Wait() // imh will record all errors, so ignore the error of Wait()
imh.Errors = errs

w.WriteHeader(http.StatusAccepted)
}
22 changes: 20 additions & 2 deletions registry/storage/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"regexp"
"runtime"

"github.com/docker/distribution"
"github.com/docker/distribution/reference"
Expand All @@ -11,6 +12,10 @@ import (
"github.com/docker/libtrust"
)

var (
DefaultConcurrencyLimit = runtime.GOMAXPROCS(0)
)

// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
Expand All @@ -20,6 +25,7 @@ type registry struct {
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool
schema1Enabled bool
tagLookupConcurrencyLimit int
resumableDigestEnabled bool
schema1SigningKey libtrust.PrivateKey
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
Expand All @@ -43,6 +49,13 @@ func EnableRedirect(registry *registry) error {
return nil
}

func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption {
return func(registry *registry) error {
registry.tagLookupConcurrencyLimit = concurrencyLimit
return nil
}
}

// EnableDelete is a functional option for NewRegistry. It enables deletion on
// the registry.
func EnableDelete(registry *registry) error {
Expand Down Expand Up @@ -203,9 +216,14 @@ func (repo *repository) Named() reference.Named {
}

func (repo *repository) Tags(ctx context.Context) distribution.TagService {
limit := DefaultConcurrencyLimit
if repo.tagLookupConcurrencyLimit > 0 {
limit = repo.tagLookupConcurrencyLimit
}
tags := &tagStore{
repository: repo,
blobStore: repo.registry.blobStore,
repository: repo,
blobStore: repo.registry.blobStore,
concurrencyLimit: limit,
}

return tags
Expand Down
59 changes: 42 additions & 17 deletions registry/storage/tagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package storage
import (
"context"
"path"
"sync"

"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/opencontainers/go-digest"
"golang.org/x/sync/errgroup"
)

var _ distribution.TagService = &tagStore{}
Expand All @@ -17,8 +19,9 @@ var _ distribution.TagService = &tagStore{}
// which only makes use of the Digest field of the returned distribution.Descriptor
// but does not enable full roundtripping of Descriptor objects
type tagStore struct {
repository *repository
blobStore *blobStore
repository *repository
blobStore *blobStore
concurrencyLimit int
}

// All returns all tags
Expand Down Expand Up @@ -153,26 +156,48 @@ func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([
return nil, err
}

var tags []string
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(ts.concurrencyLimit)

var (
tags []string
mu sync.Mutex
)
for _, tag := range allTags {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
if ctx.Err() != nil {
break
}
tag := tag

tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
continue
g.Go(func() error {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
}
return nil, err
}

if tagDigest == desc.Digest {
tags = append(tags, tag)
}
tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
return nil
}
return err
}

if tagDigest == desc.Digest {
mu.Lock()
tags = append(tags, tag)
mu.Unlock()
}

return nil
})
}

err = g.Wait()
if err != nil {
return nil, err
}

return tags, nil
Expand Down

0 comments on commit 3c718b1

Please sign in to comment.