From bed7d5cdc88a027252398c7e8e75f81fa27cee8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Wed, 20 Jul 2016 17:01:09 +0200 Subject: [PATCH 01/12] UPSTREAM: docker/distribution: 1757: Export storage.CreateOptions in top-level package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop this during next distribution rebase. Revert also reverts "UPSTREAM: docker/distribution: : export storage.CreateOptions" (commit 8c8bc43bcc2dd921ca3076f64014eac4e0716acb) Signed-off-by: Michal Minář --- vendor/github.com/docker/distribution/blobs.go | 9 +++++++++ .../distribution/registry/client/repository.go | 13 ++----------- .../registry/storage/linkedblobstore.go | 13 ++----------- 3 files changed, 13 insertions(+), 22 deletions(-) diff --git a/vendor/github.com/docker/distribution/blobs.go b/vendor/github.com/docker/distribution/blobs.go index d125330117f8..400bc346ee9b 100644 --- a/vendor/github.com/docker/distribution/blobs.go +++ b/vendor/github.com/docker/distribution/blobs.go @@ -192,6 +192,15 @@ type BlobCreateOption interface { Apply(interface{}) error } +// CreateOptions is a collection of blob creation modifiers relevant to general +// blob storage intended to be configured by the BlobCreateOption.Apply method. +type CreateOptions struct { + Mount struct { + ShouldMount bool + From reference.Canonical + } +} + // BlobWriter provides a handle for inserting data into a blob store. // Instances should be obtained from BlobWriteService.Writer and // BlobWriteService.Resume. If supported by the store, a writer can be diff --git a/vendor/github.com/docker/distribution/registry/client/repository.go b/vendor/github.com/docker/distribution/registry/client/repository.go index 8cc5f7f9aa03..0a724bb03eca 100644 --- a/vendor/github.com/docker/distribution/registry/client/repository.go +++ b/vendor/github.com/docker/distribution/registry/client/repository.go @@ -649,15 +649,6 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut return writer.Commit(ctx, desc) } -// createOptions is a collection of blob creation modifiers relevant to general -// blob storage intended to be configured by the BlobCreateOption.Apply method. -type createOptions struct { - Mount struct { - ShouldMount bool - From reference.Canonical - } -} - type optionFunc func(interface{}) error func (f optionFunc) Apply(v interface{}) error { @@ -668,7 +659,7 @@ func (f optionFunc) Apply(v interface{}) error { // mounted from the given canonical reference. func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { return optionFunc(func(v interface{}) error { - opts, ok := v.(*createOptions) + opts, ok := v.(*distribution.CreateOptions) if !ok { return fmt.Errorf("unexpected options type: %T", v) } @@ -681,7 +672,7 @@ func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { } func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { - var opts createOptions + var opts distribution.CreateOptions for _, option := range options { err := option.Apply(&opts) diff --git a/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go b/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go index 38b9e71aa281..0af3ae2f59a1 100644 --- a/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go +++ b/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go @@ -101,15 +101,6 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) return desc, lbs.linkBlob(ctx, desc) } -// CreateOptions is a collection of blob creation modifiers relevant to general -// blob storage intended to be configured by the BlobCreateOption.Apply method. -type CreateOptions struct { - Mount struct { - ShouldMount bool - From reference.Canonical - } -} - type optionFunc func(interface{}) error func (f optionFunc) Apply(v interface{}) error { @@ -120,7 +111,7 @@ func (f optionFunc) Apply(v interface{}) error { // mounted from the given canonical reference. func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { return optionFunc(func(v interface{}) error { - opts, ok := v.(*CreateOptions) + opts, ok := v.(*distribution.CreateOptions) if !ok { return fmt.Errorf("unexpected options type: %T", v) } @@ -136,7 +127,7 @@ func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer") - var opts CreateOptions + var opts distribution.CreateOptions for _, option := range options { err := option.Apply(&opts) From 014a0b19f77f56da0183b7de6b2b525331cf4260 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Wed, 20 Jul 2016 16:13:23 -0400 Subject: [PATCH 02/12] UPSTREAM: docker/distribution: 1857: Provide stat descriptor for Create method during cross-repo mount --- .../github.com/docker/distribution/blobs.go | 1 + .../registry/storage/linkedblobstore.go | 25 ++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/vendor/github.com/docker/distribution/blobs.go b/vendor/github.com/docker/distribution/blobs.go index 400bc346ee9b..9788126eb398 100644 --- a/vendor/github.com/docker/distribution/blobs.go +++ b/vendor/github.com/docker/distribution/blobs.go @@ -198,6 +198,7 @@ type CreateOptions struct { Mount struct { ShouldMount bool From reference.Canonical + Stat *Descriptor } } diff --git a/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go b/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go index 0af3ae2f59a1..5f720a3bb36e 100644 --- a/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go +++ b/vendor/github.com/docker/distribution/registry/storage/linkedblobstore.go @@ -137,7 +137,7 @@ func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution. } if opts.Mount.ShouldMount { - desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest()) + desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest(), opts.Mount.Stat) if err == nil { // Mount successful, no need to initiate an upload session return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc} @@ -280,14 +280,21 @@ func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest. return nil } -func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) { - repo, err := lbs.registry.Repository(ctx, sourceRepo) - if err != nil { - return distribution.Descriptor{}, err - } - stat, err := repo.Blobs(ctx).Stat(ctx, dgst) - if err != nil { - return distribution.Descriptor{}, err +func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *distribution.Descriptor) (distribution.Descriptor, error) { + var stat distribution.Descriptor + if sourceStat == nil { + // look up the blob info from the sourceRepo if not already provided + repo, err := lbs.registry.Repository(ctx, sourceRepo) + if err != nil { + return distribution.Descriptor{}, err + } + stat, err = repo.Blobs(ctx).Stat(ctx, dgst) + if err != nil { + return distribution.Descriptor{}, err + } + } else { + // use the provided blob info + stat = *sourceStat } desc := distribution.Descriptor{ From 101f5e439e7eebba00244ac60518c03029476f51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Wed, 27 Jul 2016 09:59:39 +0200 Subject: [PATCH 03/12] UPSTREAM: docker/distribution: : added missing testutil package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michal Minář --- .../docker/distribution/testutil/handler.go | 148 ++++++++++++++++++ .../docker/distribution/testutil/manifests.go | 87 ++++++++++ .../docker/distribution/testutil/tarfile.go | 116 ++++++++++++++ 3 files changed, 351 insertions(+) create mode 100644 vendor/github.com/docker/distribution/testutil/handler.go create mode 100644 vendor/github.com/docker/distribution/testutil/manifests.go create mode 100644 vendor/github.com/docker/distribution/testutil/tarfile.go diff --git a/vendor/github.com/docker/distribution/testutil/handler.go b/vendor/github.com/docker/distribution/testutil/handler.go new file mode 100644 index 000000000000..00cd8a6ac291 --- /dev/null +++ b/vendor/github.com/docker/distribution/testutil/handler.go @@ -0,0 +1,148 @@ +package testutil + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "sort" + "strings" +) + +// RequestResponseMap is an ordered mapping from Requests to Responses +type RequestResponseMap []RequestResponseMapping + +// RequestResponseMapping defines a Response to be sent in response to a given +// Request +type RequestResponseMapping struct { + Request Request + Response Response +} + +// Request is a simplified http.Request object +type Request struct { + // Method is the http method of the request, for example GET + Method string + + // Route is the http route of this request + Route string + + // QueryParams are the query parameters of this request + QueryParams map[string][]string + + // Body is the byte contents of the http request + Body []byte + + // Headers are the header for this request + Headers http.Header +} + +func (r Request) String() string { + queryString := "" + if len(r.QueryParams) > 0 { + keys := make([]string, 0, len(r.QueryParams)) + queryParts := make([]string, 0, len(r.QueryParams)) + for k := range r.QueryParams { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + for _, val := range r.QueryParams[k] { + queryParts = append(queryParts, fmt.Sprintf("%s=%s", k, url.QueryEscape(val))) + } + } + queryString = "?" + strings.Join(queryParts, "&") + } + var headers []string + if len(r.Headers) > 0 { + var headerKeys []string + for k := range r.Headers { + headerKeys = append(headerKeys, k) + } + sort.Strings(headerKeys) + + for _, k := range headerKeys { + for _, val := range r.Headers[k] { + headers = append(headers, fmt.Sprintf("%s:%s", k, val)) + } + } + + } + return fmt.Sprintf("%s %s%s\n%s\n%s", r.Method, r.Route, queryString, headers, r.Body) +} + +// Response is a simplified http.Response object +type Response struct { + // Statuscode is the http status code of the Response + StatusCode int + + // Headers are the http headers of this Response + Headers http.Header + + // Body is the response body + Body []byte +} + +// testHandler is an http.Handler with a defined mapping from Request to an +// ordered list of Response objects +type testHandler struct { + responseMap map[string][]Response +} + +// NewHandler returns a new test handler that responds to defined requests +// with specified responses +// Each time a Request is received, the next Response is returned in the +// mapping, until no Responses are defined, at which point a 404 is sent back +func NewHandler(requestResponseMap RequestResponseMap) http.Handler { + responseMap := make(map[string][]Response) + for _, mapping := range requestResponseMap { + responses, ok := responseMap[mapping.Request.String()] + if ok { + responseMap[mapping.Request.String()] = append(responses, mapping.Response) + } else { + responseMap[mapping.Request.String()] = []Response{mapping.Response} + } + } + return &testHandler{responseMap: responseMap} +} + +func (app *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + requestBody, _ := ioutil.ReadAll(r.Body) + request := Request{ + Method: r.Method, + Route: r.URL.Path, + QueryParams: r.URL.Query(), + Body: requestBody, + Headers: make(map[string][]string), + } + + // Add headers of interest here + for k, v := range r.Header { + if k == "If-None-Match" { + request.Headers[k] = v + } + } + + responses, ok := app.responseMap[request.String()] + + if !ok || len(responses) == 0 { + http.NotFound(w, r) + return + } + + response := responses[0] + app.responseMap[request.String()] = responses[1:] + + responseHeader := w.Header() + for k, v := range response.Headers { + responseHeader[k] = v + } + + w.WriteHeader(response.StatusCode) + + io.Copy(w, bytes.NewReader(response.Body)) +} diff --git a/vendor/github.com/docker/distribution/testutil/manifests.go b/vendor/github.com/docker/distribution/testutil/manifests.go new file mode 100644 index 000000000000..c4f9fef53a84 --- /dev/null +++ b/vendor/github.com/docker/distribution/testutil/manifests.go @@ -0,0 +1,87 @@ +package testutil + +import ( + "fmt" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/docker/libtrust" +) + +// MakeManifestList constructs a manifest list out of a list of manifest digests +func MakeManifestList(blobstatter distribution.BlobStatter, manifestDigests []digest.Digest) (*manifestlist.DeserializedManifestList, error) { + ctx := context.Background() + + var manifestDescriptors []manifestlist.ManifestDescriptor + for _, manifestDigest := range manifestDigests { + descriptor, err := blobstatter.Stat(ctx, manifestDigest) + if err != nil { + return nil, err + } + platformSpec := manifestlist.PlatformSpec{ + Architecture: "atari2600", + OS: "CP/M", + Variant: "ternary", + Features: []string{"VLIW", "superscalaroutoforderdevnull"}, + } + manifestDescriptor := manifestlist.ManifestDescriptor{ + Descriptor: descriptor, + Platform: platformSpec, + } + manifestDescriptors = append(manifestDescriptors, manifestDescriptor) + } + + return manifestlist.FromDescriptors(manifestDescriptors) +} + +// MakeSchema1Manifest constructs a schema 1 manifest from a given list of digests and returns +// the digest of the manifest +func MakeSchema1Manifest(digests []digest.Digest) (distribution.Manifest, error) { + manifest := schema1.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: "who", + Tag: "cares", + } + + for _, digest := range digests { + manifest.FSLayers = append(manifest.FSLayers, schema1.FSLayer{BlobSum: digest}) + manifest.History = append(manifest.History, schema1.History{V1Compatibility: ""}) + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + return nil, fmt.Errorf("unexpected error generating private key: %v", err) + } + + signedManifest, err := schema1.Sign(&manifest, pk) + if err != nil { + return nil, fmt.Errorf("error signing manifest: %v", err) + } + + return signedManifest, nil +} + +// MakeSchema2Manifest constructs a schema 2 manifest from a given list of digests and returns +// the digest of the manifest +func MakeSchema2Manifest(repository distribution.Repository, digests []digest.Digest) (distribution.Manifest, error) { + ctx := context.Background() + blobStore := repository.Blobs(ctx) + builder := schema2.NewManifestBuilder(blobStore, []byte{}) + for _, digest := range digests { + builder.AppendReference(distribution.Descriptor{Digest: digest}) + } + + manifest, err := builder.Build(ctx) + if err != nil { + return nil, fmt.Errorf("unexpected error generating manifest: %v", err) + } + + return manifest, nil +} diff --git a/vendor/github.com/docker/distribution/testutil/tarfile.go b/vendor/github.com/docker/distribution/testutil/tarfile.go new file mode 100644 index 000000000000..baa5ac5acbe7 --- /dev/null +++ b/vendor/github.com/docker/distribution/testutil/tarfile.go @@ -0,0 +1,116 @@ +package testutil + +import ( + "archive/tar" + "bytes" + "crypto/rand" + "fmt" + "io" + mrand "math/rand" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" +) + +// CreateRandomTarFile creates a random tarfile, returning it as an +// io.ReadSeeker along with its digest. An error is returned if there is a +// problem generating valid content. +func CreateRandomTarFile() (rs io.ReadSeeker, dgst digest.Digest, err error) { + nFiles := mrand.Intn(10) + 10 + target := &bytes.Buffer{} + wr := tar.NewWriter(target) + + // Perturb this on each iteration of the loop below. + header := &tar.Header{ + Mode: 0644, + ModTime: time.Now(), + Typeflag: tar.TypeReg, + Uname: "randocalrissian", + Gname: "cloudcity", + AccessTime: time.Now(), + ChangeTime: time.Now(), + } + + for fileNumber := 0; fileNumber < nFiles; fileNumber++ { + fileSize := mrand.Int63n(1<<20) + 1<<20 + + header.Name = fmt.Sprint(fileNumber) + header.Size = fileSize + + if err := wr.WriteHeader(header); err != nil { + return nil, "", err + } + + randomData := make([]byte, fileSize) + + // Fill up the buffer with some random data. + n, err := rand.Read(randomData) + + if n != len(randomData) { + return nil, "", fmt.Errorf("short read creating random reader: %v bytes != %v bytes", n, len(randomData)) + } + + if err != nil { + return nil, "", err + } + + nn, err := io.Copy(wr, bytes.NewReader(randomData)) + if nn != fileSize { + return nil, "", fmt.Errorf("short copy writing random file to tar") + } + + if err != nil { + return nil, "", err + } + + if err := wr.Flush(); err != nil { + return nil, "", err + } + } + + if err := wr.Close(); err != nil { + return nil, "", err + } + + dgst = digest.FromBytes(target.Bytes()) + + return bytes.NewReader(target.Bytes()), dgst, nil +} + +// CreateRandomLayers returns a map of n digests. We don't particularly care +// about the order of said digests (since they're all random anyway). +func CreateRandomLayers(n int) (map[digest.Digest]io.ReadSeeker, error) { + digestMap := map[digest.Digest]io.ReadSeeker{} + for i := 0; i < n; i++ { + rs, ds, err := CreateRandomTarFile() + if err != nil { + return nil, fmt.Errorf("unexpected error generating test layer file: %v", err) + } + + dgst := digest.Digest(ds) + digestMap[dgst] = rs + } + return digestMap, nil +} + +// UploadBlobs lets you upload blobs to a repository +func UploadBlobs(repository distribution.Repository, layers map[digest.Digest]io.ReadSeeker) error { + ctx := context.Background() + for digest, rs := range layers { + wr, err := repository.Blobs(ctx).Create(ctx) + if err != nil { + return fmt.Errorf("unexpected error creating upload: %v", err) + } + + if _, err := io.Copy(wr, rs); err != nil { + return fmt.Errorf("unexpected error copying to upload: %v", err) + } + + if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: digest}); err != nil { + return fmt.Errorf("unexpected error committinng upload: %v", err) + } + } + return nil +} From f1ecb1d247fadacde12491113fad25fedd1169ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Mon, 11 Jul 2016 14:31:53 +0200 Subject: [PATCH 04/12] Store media type in image MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michal Minář --- pkg/image/api/helper.go | 10 +++++++--- pkg/image/api/helper_test.go | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/image/api/helper.go b/pkg/image/api/helper.go index 657ce483de22..bbef38db1424 100644 --- a/pkg/image/api/helper.go +++ b/pkg/image/api/helper.go @@ -434,14 +434,14 @@ func ImageConfigMatchesImage(image *Image, imageConfig []byte) (bool, error) { return v.Verified(), nil } -// ImageWithMetadata returns a copy of image with the DockerImageMetadata filled in -// from the raw DockerImageManifest data stored in the image. +// ImageWithMetadata mutates the given image. It parses raw DockerImageManifest data stored in the image and +// fills its DockerImageMetadata and other fields. func ImageWithMetadata(image *Image) error { if len(image.DockerImageManifest) == 0 { return nil } - if len(image.DockerImageLayers) > 0 && image.DockerImageMetadata.Size > 0 { + if len(image.DockerImageLayers) > 0 && image.DockerImageMetadata.Size > 0 && len(image.DockerImageManifestMediaType) > 0 { glog.V(5).Infof("Image metadata already filled for %s", image.Name) // don't update image already filled return nil @@ -463,6 +463,8 @@ func ImageWithMetadata(image *Image) error { return nil } + image.DockerImageManifestMediaType = schema1.MediaTypeManifest + v1Metadata := DockerV1CompatibilityImage{} if err := json.Unmarshal([]byte(manifest.History[0].DockerV1Compatibility), &v1Metadata); err != nil { return err @@ -523,6 +525,8 @@ func ImageWithMetadata(image *Image) error { return err } + image.DockerImageManifestMediaType = schema2.MediaTypeManifest + image.DockerImageLayers = make([]ImageLayer, len(manifest.Layers)) for i, layer := range manifest.Layers { image.DockerImageLayers[i].Name = layer.Digest diff --git a/pkg/image/api/helper_test.go b/pkg/image/api/helper_test.go index ad1967cd4049..a70908f8e731 100644 --- a/pkg/image/api/helper_test.go +++ b/pkg/image/api/helper_test.go @@ -637,7 +637,6 @@ func validImageWithManifestV2Data() Image { ObjectMeta: kapi.ObjectMeta{ Name: "id", }, - DockerImageManifestMediaType: "application/vnd.docker.container.image.v1+json", DockerImageConfig: `{ "architecture": "amd64", "config": { @@ -815,6 +814,7 @@ func TestImageWithMetadata(t *testing.T) { {Name: "tarsum.dev+sha256:b194de3772ebbcdc8f244f663669799ac1cb141834b7cb8b69100285d357a2b0", MediaType: "application/vnd.docker.container.image.rootfs.diff+x-gtar", LayerSize: 1895}, {Name: "tarsum.dev+sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", MediaType: "application/vnd.docker.container.image.rootfs.diff+x-gtar", LayerSize: 0}, }, + DockerImageManifestMediaType: "application/vnd.docker.distribution.manifest.v1+json", DockerImageMetadata: DockerImage{ ID: "2d24f826cb16146e2016ff349a8a33ed5830f3b938d45c0f82943f4ab8c097e7", Parent: "117ee323aaa9d1b136ea55e4421f4ce413dfc6c0cc6b2186dea6c88d93e1ad7c", @@ -887,7 +887,7 @@ func TestImageWithMetadata(t *testing.T) { }, DockerImageConfig: validImageWithManifestV2Data().DockerImageConfig, DockerImageManifest: validImageWithManifestV2Data().DockerImageManifest, - DockerImageManifestMediaType: "application/vnd.docker.container.image.v1+json", + DockerImageManifestMediaType: "application/vnd.docker.distribution.manifest.v2+json", DockerImageLayers: []ImageLayer{ {Name: "sha256:b4ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4", MediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip", LayerSize: 639152}, {Name: "sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa", MediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip", LayerSize: 235231}, From a9952b81d773b9ce917ae24b320853d51060d5d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Thu, 14 Jul 2016 10:09:23 +0200 Subject: [PATCH 05/12] Check for blob existence before serving MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Modified blobDescriptorService.Stat, which used to bypass layer link check, to verify that requested blob exists either as a layer link locally in repository or is referenced in corresponding image stream. The latter is obtained from etcd by fetching image stream and all its images until the blob is found. Internal digest cache is used to store cache blob <-> repository pairs to reduce number of etcd queries. Signed-off-by: Michal Minář --- pkg/dockerregistry/server/auth.go | 16 -- .../server/blobdescriptorservice.go | 143 +++++++++++++++++- pkg/dockerregistry/server/errorblobstore.go | 84 +++++++--- .../server/repositorymiddleware.go | 67 +++++--- pkg/dockerregistry/server/util.go | 73 +++++++++ 5 files changed, 319 insertions(+), 64 deletions(-) create mode 100644 pkg/dockerregistry/server/util.go diff --git a/pkg/dockerregistry/server/auth.go b/pkg/dockerregistry/server/auth.go index 8f98a5d48c6a..864656cf3f32 100644 --- a/pkg/dockerregistry/server/auth.go +++ b/pkg/dockerregistry/server/auth.go @@ -327,22 +327,6 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg return WithUserClient(ctx, osClient), nil } -func getNamespaceName(resourceName string) (string, string, error) { - repoParts := strings.SplitN(resourceName, "/", 2) - if len(repoParts) != 2 { - return "", "", ErrNamespaceRequired - } - ns := repoParts[0] - if len(ns) == 0 { - return "", "", ErrNamespaceRequired - } - name := repoParts[1] - if len(name) == 0 { - return "", "", ErrNamespaceRequired - } - return ns, name, nil -} - func getOpenShiftAPIToken(ctx context.Context, req *http.Request) (string, error) { token := "" diff --git a/pkg/dockerregistry/server/blobdescriptorservice.go b/pkg/dockerregistry/server/blobdescriptorservice.go index 7ace8d86fb11..825c1293878a 100644 --- a/pkg/dockerregistry/server/blobdescriptorservice.go +++ b/pkg/dockerregistry/server/blobdescriptorservice.go @@ -1,13 +1,26 @@ package server import ( + "sort" + "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/registry/middleware/registry" "github.com/docker/distribution/registry/storage" + + kerrors "k8s.io/kubernetes/pkg/api/errors" + + imageapi "github.com/openshift/origin/pkg/image/api" ) +// ByGeneration allows for sorting tag events from latest to oldest. +type ByGeneration []*imageapi.TagEvent + +func (b ByGeneration) Less(i, j int) bool { return b[i].Generation > b[j].Generation } +func (b ByGeneration) Len() int { return len(b) } +func (b ByGeneration) Swap(i, j int) { b[i], b[j] = b[j], b[i] } + func init() { middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&blobDescriptorServiceFactory{})) } @@ -25,6 +38,134 @@ type blobDescriptorService struct { distribution.BlobDescriptorService } +// Stat returns a a blob descriptor if the given blob is either linked in repository or is referenced in +// corresponding image stream. This method is invoked from inside of upstream's linkedBlobStore. It expects +// a proper repository object to be set on given context by upper openshift middleware wrappers. func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { - return dockerRegistry.BlobStatter().Stat(ctx, dgst) + // if there is a repo layer link, return its descriptor + desc, statErr := bs.BlobDescriptorService.Stat(ctx, dgst) + if statErr == nil { + return desc, nil + } + + // verify the blob is stored locally + desc, err := dockerRegistry.BlobStatter().Stat(ctx, dgst) + if err != nil { + return desc, err + } + + // verify that an image holding this blob is actually present in the image stream + repo, found := RepositoryFrom(ctx) + if !found { + context.GetLogger(ctx).Errorf("failed to retrieve repository from context") + return distribution.Descriptor{}, statErr + } + + if imageStreamHasBlob(repo, dgst) { + return desc, nil + } + + return distribution.Descriptor{}, distribution.ErrBlobUnknown +} + +// imageStreamHasBlob returns true if the given blob digest is referenced in image stream corresponding to +// given repository. If not found locally, image stream's images will be iterated and fetched from newest to +// oldest until found. Each processed image will update local cache of blobs. +func imageStreamHasBlob(r *repository, dgst digest.Digest) bool { + repositories := r.cachedLayers.RepositoriesForDigest(dgst) + match := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name}.Exact() + for _, repo := range repositories { + if repo == match { + context.GetLogger(r.ctx).Debugf("found cached blob %q in repository %s/%s", dgst.String(), r.namespace, r.name) + return true + } + } + + // verify directly with etcd + is, err := r.getImageStream() + if err != nil { + context.GetLogger(r.ctx).Errorf("failed to get image stream: %v", err) + return false + } + + tagEvents := []*imageapi.TagEvent{} + event2Name := make(map[*imageapi.TagEvent]string) + for name, eventList := range is.Status.Tags { + for i := range eventList.Items { + event := &eventList.Items[i] + tagEvents = append(tagEvents, event) + event2Name[event] = name + } + } + // search from youngest to oldest + sort.Sort(ByGeneration(tagEvents)) + + processedImages := map[string]struct{}{} + + for _, tagEvent := range tagEvents { + if _, processed := processedImages[tagEvent.Image]; processed { + continue + } + if imageHasBlob(r, match, tagEvent.Image, dgst.String(), !r.pullthrough) { + tagName := event2Name[tagEvent] + context.GetLogger(r.ctx).Debugf("blob found under istag %s/%s:%s in image %s", r.namespace, r.name, tagName, tagEvent.Image) + return true + } + processedImages[tagEvent.Image] = struct{}{} + } + + context.GetLogger(r.ctx).Warnf("blob %q exists locally but is not referenced in repository %s/%s", dgst.String(), r.namespace, r.name) + + return false +} + +// imageHasBlob returns true if the image identified by imageName refers to the given blob. The image is +// fetched. If requireManaged is true and the image is not managed (it refers to remote registry), the image +// will not be processed. Fetched image will update local cache of blobs -> repositories with (blobDigest, +// cacheName) pairs. +func imageHasBlob( + r *repository, + cacheName, + imageName, + blobDigest string, + requireManaged bool, +) bool { + image, err := r.getImage(digest.Digest(imageName)) + if err != nil { + if kerrors.IsNotFound(err) { + context.GetLogger(r.ctx).Debugf("image %q not found: imageName") + } else { + context.GetLogger(r.ctx).Errorf("failed to get image: %v", err) + } + return false + } + + // in case of pullthrough disabled, client won't be able to download a blob belonging to not managed image + // (image stored in external registry), thus don't consider them as candidates + if managed := image.Annotations[imageapi.ManagedByOpenShiftAnnotation]; requireManaged && managed != "true" { + context.GetLogger(r.ctx).Debugf("skipping not managed image") + return false + } + + if len(image.DockerImageLayers) == 0 { + if len(image.DockerImageManifestMediaType) > 0 { + // image has no layers + return false + } + err = imageapi.ImageWithMetadata(image) + if err != nil { + context.GetLogger(r.ctx).Errorf("failed to get metadata for image %s: %v", imageName, err) + return false + } + } + + for _, layer := range image.DockerImageLayers { + if layer.Name == blobDigest { + // remember all the layers of matching image + r.rememberLayersOfImage(image, cacheName) + return true + } + } + + return false } diff --git a/pkg/dockerregistry/server/errorblobstore.go b/pkg/dockerregistry/server/errorblobstore.go index b1627d09c6fe..af8dc3f5e822 100644 --- a/pkg/dockerregistry/server/errorblobstore.go +++ b/pkg/dockerregistry/server/errorblobstore.go @@ -7,7 +7,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/reference" ) // errorBlobStore wraps a distribution.BlobStore for a particular repo. @@ -23,44 +23,57 @@ func (r *errorBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribu if err := r.repo.checkPendingErrors(ctx); err != nil { return distribution.Descriptor{}, err } - return r.store.Stat(ctx, dgst) + return r.store.Stat(WithRepository(ctx, r.repo), dgst) } func (r *errorBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { if err := r.repo.checkPendingErrors(ctx); err != nil { return nil, err } - return r.store.Get(ctx, dgst) + return r.store.Get(WithRepository(ctx, r.repo), dgst) } func (r *errorBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { if err := r.repo.checkPendingErrors(ctx); err != nil { return nil, err } - return r.store.Open(ctx, dgst) + return r.store.Open(WithRepository(ctx, r.repo), dgst) } func (r *errorBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { if err := r.repo.checkPendingErrors(ctx); err != nil { return distribution.Descriptor{}, err } - return r.store.Put(ctx, mediaType, p) + return r.store.Put(WithRepository(ctx, r.repo), mediaType, p) } func (r *errorBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { + var desc distribution.Descriptor + if err := r.repo.checkPendingErrors(ctx); err != nil { return nil, err } + ctx = WithRepository(ctx, r.repo) + opts, err := effectiveCreateOptions(options) if err != nil { return nil, err } - if err := checkPendingCrossMountErrors(ctx, opts); err != nil { - context.GetLogger(r.repo.ctx).Debugf("disabling cross-mount because of pending error: %v", err) + err = checkPendingCrossMountErrors(ctx, opts) + if err == nil && opts.Mount.ShouldMount { + desc, err = statSourceRepository(ctx, opts.Mount.From, opts.Mount.From.Digest()) + } + + if err != nil { + context.GetLogger(ctx).Infof("disabling cross-repo mount because of an error: %v", err) + options = append(options, guardCreateOptions{DisableCrossMount: true}) + } else if !opts.Mount.ShouldMount { + context.GetLogger(ctx).Infof("ensuring cross-repo is disabled: %v", err) options = append(options, guardCreateOptions{DisableCrossMount: true}) } else { - options = append(options, guardCreateOptions{}) + context.GetLogger(ctx).Debugf("attempting cross-repo mount") + options = append(options, statCrossMountCreateOptions{desc: desc}) } return r.store.Create(ctx, options...) @@ -70,36 +83,27 @@ func (r *errorBlobStore) Resume(ctx context.Context, id string) (distribution.Bl if err := r.repo.checkPendingErrors(ctx); err != nil { return nil, err } - return r.store.Resume(ctx, id) + return r.store.Resume(WithRepository(ctx, r.repo), id) } func (r *errorBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error { if err := r.repo.checkPendingErrors(ctx); err != nil { return err } - return r.store.ServeBlob(ctx, w, req, dgst) + return r.store.ServeBlob(WithRepository(ctx, r.repo), w, req, dgst) } func (r *errorBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { if err := r.repo.checkPendingErrors(ctx); err != nil { return err } - return r.store.Delete(ctx, dgst) + return r.store.Delete(WithRepository(ctx, r.repo), dgst) } -// Find out what the blob creation options are going to do by dry-running them -func effectiveCreateOptions(options []distribution.BlobCreateOption) (*storage.CreateOptions, error) { - opts := &storage.CreateOptions{} - for _, createOptions := range options { - err := createOptions.Apply(opts) - if err != nil { - return nil, err - } - } - return opts, nil -} - -func checkPendingCrossMountErrors(ctx context.Context, opts *storage.CreateOptions) error { +// checkPendingCrossMountErrors returns true if a cross-repo mount has been requested with given create +// options. If requested and there are pending authorization errors for source repository, the error will be +// returned. Cross-repo mount must not be allowed in case of error. +func checkPendingCrossMountErrors(ctx context.Context, opts *distribution.CreateOptions) error { if !opts.Mount.ShouldMount { return nil } @@ -118,7 +122,7 @@ type guardCreateOptions struct { var _ distribution.BlobCreateOption = guardCreateOptions{} func (f guardCreateOptions) Apply(v interface{}) error { - opts, ok := v.(*storage.CreateOptions) + opts, ok := v.(*distribution.CreateOptions) if !ok { return fmt.Errorf("Unexpected create options: %#v", v) } @@ -127,3 +131,33 @@ func (f guardCreateOptions) Apply(v interface{}) error { } return nil } + +// statCrossMountCreateOptions ensures the expected options type is passed, and optionally pre-fills the cross-mount stat info +type statCrossMountCreateOptions struct { + desc distribution.Descriptor +} + +var _ distribution.BlobCreateOption = statCrossMountCreateOptions{} + +func (f statCrossMountCreateOptions) Apply(v interface{}) error { + opts, ok := v.(*distribution.CreateOptions) + if !ok { + return fmt.Errorf("Unexpected create options: %#v", v) + } + + if !opts.Mount.ShouldMount { + return nil + } + + opts.Mount.Stat = &f.desc + + return nil +} + +func statSourceRepository(ctx context.Context, sourceRepoName reference.Named, dgst digest.Digest) (distribution.Descriptor, error) { + repo, err := dockerRegistry.Repository(ctx, sourceRepoName) + if err != nil { + return distribution.Descriptor{}, err + } + return repo.Blobs(ctx).Stat(ctx, dgst) +} diff --git a/pkg/dockerregistry/server/repositorymiddleware.go b/pkg/dockerregistry/server/repositorymiddleware.go index 81fd7f6c9f5b..7ce25ce17e0c 100644 --- a/pkg/dockerregistry/server/repositorymiddleware.go +++ b/pkg/dockerregistry/server/repositorymiddleware.go @@ -64,7 +64,7 @@ var ( ) func init() { - cache, err := newDigestToRepositoryCache(1024) + cache, err := newDigestToRepositoryCache(2048) if err != nil { panic(err) } @@ -171,16 +171,6 @@ func newRepositoryWithClient( }, nil } -func getBoolOption(name string, defval bool, options map[string]interface{}) bool { - if value, ok := options[name]; ok { - var b bool - if b, ok = value.(bool); ok { - return b - } - } - return defval -} - // Manifests returns r, which implements distribution.ManifestService. func (r *repository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { if r.ctx == ctx { @@ -271,7 +261,13 @@ func (r *repository) Get(ctx context.Context, dgst digest.Digest, options ...dis } ref := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name, Registry: r.registryAddr} - manifest, err := r.manifestFromImageWithCachedLayers(image, ref.DockerClientDefaults().Exact()) + if managed := image.Annotations[imageapi.ManagedByOpenShiftAnnotation]; managed == "true" { + ref.Registry = "" + } else { + ref = ref.DockerClientDefaults() + } + + manifest, err := r.manifestFromImageWithCachedLayers(image, ref.Exact()) return manifest, err } @@ -485,6 +481,7 @@ func (r *repository) Delete(ctx context.Context, dgst digest.Digest) error { if err != nil { return err } + ctx = WithRepository(ctx, r) return ms.Delete(ctx, dgst) } @@ -516,11 +513,29 @@ func (r *repository) getImageStreamImage(dgst digest.Digest) (*imageapi.ImageStr return r.registryOSClient.ImageStreamImages(r.namespace).Get(r.name, dgst.String()) } -// rememberLayers caches the provided layers -func (r *repository) rememberLayers(manifest distribution.Manifest, cacheName string) { - if !r.pullthrough { +// rememberLayersOfImage caches the layer digests of given image +func (r *repository) rememberLayersOfImage(image *imageapi.Image, cacheName string) { + if len(image.DockerImageLayers) == 0 && len(image.DockerImageManifestMediaType) > 0 { + // image has no layers return } + + if len(image.DockerImageLayers) > 0 { + for _, layer := range image.DockerImageLayers { + r.cachedLayers.RememberDigest(digest.Digest(layer.Name), cacheName) + } + return + } + + manifest, err := r.getManifestFromImage(image) + if err != nil { + context.GetLogger(r.ctx).Errorf("cannot remember layers of image %q: %v", image.Name, err) + } + r.rememberLayersOfManifest(manifest, cacheName) +} + +// rememberLayersOfManifest caches the layer digests of given manifest +func (r *repository) rememberLayersOfManifest(manifest distribution.Manifest, cacheName string) { // remember the layers in the cache as an optimization to avoid searching all remote repositories for _, layer := range manifest.References() { r.cachedLayers.RememberDigest(layer.Digest, cacheName) @@ -529,17 +544,25 @@ func (r *repository) rememberLayers(manifest distribution.Manifest, cacheName st // manifestFromImageWithCachedLayers loads the image and then caches any located layers func (r *repository) manifestFromImageWithCachedLayers(image *imageapi.Image, cacheName string) (manifest distribution.Manifest, err error) { - if image.DockerImageManifestMediaType == schema2.MediaTypeManifest { - manifest, err = r.deserializedManifestFromImage(image) - } else { - manifest, err = r.signedManifestFromImage(image) - } - + manifest, err = r.getManifestFromImage(image) if err != nil { return } - r.rememberLayers(manifest, cacheName) + r.rememberLayersOfManifest(manifest, cacheName) + return +} + +// getManifestFromImage returns a manifest object constructed from a blob stored in the given image. +func (r *repository) getManifestFromImage(image *imageapi.Image) (manifest distribution.Manifest, err error) { + switch image.DockerImageManifestMediaType { + case schema2.MediaTypeManifest: + manifest, err = deserializedManifestFromImage(image) + case schema1.MediaTypeManifest, "": + manifest, err = r.signedManifestFromImage(image) + default: + err = regapi.ErrorCodeManifestInvalid.WithDetail(fmt.Errorf("unknown manifest media type %q", image.DockerImageManifestMediaType)) + } return } diff --git a/pkg/dockerregistry/server/util.go b/pkg/dockerregistry/server/util.go new file mode 100644 index 000000000000..68082150df17 --- /dev/null +++ b/pkg/dockerregistry/server/util.go @@ -0,0 +1,73 @@ +package server + +import ( + "encoding/json" + "strings" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/manifest/schema2" + + imageapi "github.com/openshift/origin/pkg/image/api" +) + +// Context keys +const ( + // repositoryKey serves to store/retrieve repository object to/from context. + repositoryKey = "openshift.repository" +) + +func WithRepository(parent context.Context, repo *repository) context.Context { + return context.WithValue(parent, repositoryKey, repo) +} +func RepositoryFrom(ctx context.Context) (repo *repository, found bool) { + repo, found = ctx.Value(repositoryKey).(*repository) + return +} + +func getBoolOption(name string, defval bool, options map[string]interface{}) bool { + if value, ok := options[name]; ok { + var b bool + if b, ok = value.(bool); ok { + return b + } + } + return defval +} + +// deserializedManifestFromImage converts an Image to a DeserializedManifest. +func deserializedManifestFromImage(image *imageapi.Image) (*schema2.DeserializedManifest, error) { + var manifest schema2.DeserializedManifest + if err := json.Unmarshal([]byte(image.DockerImageManifest), &manifest); err != nil { + return nil, err + } + return &manifest, nil +} + +func getNamespaceName(resourceName string) (string, string, error) { + repoParts := strings.SplitN(resourceName, "/", 2) + if len(repoParts) != 2 { + return "", "", ErrNamespaceRequired + } + ns := repoParts[0] + if len(ns) == 0 { + return "", "", ErrNamespaceRequired + } + name := repoParts[1] + if len(name) == 0 { + return "", "", ErrNamespaceRequired + } + return ns, name, nil +} + +// effectiveCreateOptions find out what the blob creation options are going to do by dry-running them. +func effectiveCreateOptions(options []distribution.BlobCreateOption) (*distribution.CreateOptions, error) { + opts := &distribution.CreateOptions{} + for _, createOptions := range options { + err := createOptions.Apply(opts) + if err != nil { + return nil, err + } + } + return opts, nil +} From 46ce2ac8f31a19aab587f14623f2c6b44a229417 Mon Sep 17 00:00:00 2001 From: Michal Minar Date: Fri, 15 Jul 2016 16:44:05 +0200 Subject: [PATCH 06/12] Cache blob <-> repository entries in registry with TTL For security reasons, evict stale pairs of (blob, repository) from cache. So when an image is untagged from image stream, registry will deny access to its blobs. Also fixed a bug in digestcache where the first association for particular digest was lost and once. --- .../server/blobdescriptorservice.go | 43 +- pkg/dockerregistry/server/digestcache.go | 152 ++++- pkg/dockerregistry/server/digestcache_test.go | 556 ++++++++++++++++++ .../server/pullthroughblobstore.go | 2 +- .../server/repositorymiddleware.go | 37 +- 5 files changed, 739 insertions(+), 51 deletions(-) create mode 100644 pkg/dockerregistry/server/digestcache_test.go diff --git a/pkg/dockerregistry/server/blobdescriptorservice.go b/pkg/dockerregistry/server/blobdescriptorservice.go index 825c1293878a..bb282ed5ca7f 100644 --- a/pkg/dockerregistry/server/blobdescriptorservice.go +++ b/pkg/dockerregistry/server/blobdescriptorservice.go @@ -1,6 +1,7 @@ package server import ( + "fmt" "sort" "github.com/docker/distribution" @@ -42,25 +43,32 @@ type blobDescriptorService struct { // corresponding image stream. This method is invoked from inside of upstream's linkedBlobStore. It expects // a proper repository object to be set on given context by upper openshift middleware wrappers. func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + repo, found := RepositoryFrom(ctx) + if !found || repo == nil { + err := fmt.Errorf("failed to retrieve repository from context") + context.GetLogger(ctx).Error(err) + return distribution.Descriptor{}, err + } + // if there is a repo layer link, return its descriptor - desc, statErr := bs.BlobDescriptorService.Stat(ctx, dgst) - if statErr == nil { + desc, err := bs.BlobDescriptorService.Stat(ctx, dgst) + if err == nil { + // and remember the association + repo.cachedLayers.RememberDigest(dgst, repo.blobrepositorycachettl, imageapi.DockerImageReference{ + Namespace: repo.namespace, + Name: repo.name, + }.Exact()) return desc, nil } + context.GetLogger(ctx).Debugf("could not stat layer link %q in repository %q: %v", dgst.String(), repo.Named().Name(), err) + // verify the blob is stored locally - desc, err := dockerRegistry.BlobStatter().Stat(ctx, dgst) + desc, err = dockerRegistry.BlobStatter().Stat(ctx, dgst) if err != nil { return desc, err } - // verify that an image holding this blob is actually present in the image stream - repo, found := RepositoryFrom(ctx) - if !found { - context.GetLogger(ctx).Errorf("failed to retrieve repository from context") - return distribution.Descriptor{}, statErr - } - if imageStreamHasBlob(repo, dgst) { return desc, nil } @@ -68,6 +76,21 @@ func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) ( return distribution.Descriptor{}, distribution.ErrBlobUnknown } +func (bs *blobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error { + repo, found := RepositoryFrom(ctx) + if !found || repo == nil { + err := fmt.Errorf("failed to retrieve repository from context") + context.GetLogger(ctx).Error(err) + return err + } + + repo.cachedLayers.ForgetDigest(dgst, imageapi.DockerImageReference{ + Namespace: repo.namespace, + Name: repo.name, + }.Exact()) + return bs.BlobDescriptorService.Clear(ctx, dgst) +} + // imageStreamHasBlob returns true if the given blob digest is referenced in image stream corresponding to // given repository. If not found locally, image stream's images will be iterated and fetched from newest to // oldest until found. Each processed image will update local cache of blobs. diff --git a/pkg/dockerregistry/server/digestcache.go b/pkg/dockerregistry/server/digestcache.go index 8b1c946b812a..141ceb8e87ca 100644 --- a/pkg/dockerregistry/server/digestcache.go +++ b/pkg/dockerregistry/server/digestcache.go @@ -2,6 +2,7 @@ package server import ( "sync" + "time" "github.com/hashicorp/golang-lru" @@ -26,20 +27,36 @@ func newDigestToRepositoryCache(size int) (digestToRepositoryCache, error) { return digestToRepositoryCache{Cache: c}, nil } -const bucketSize = 10 +const bucketSize = 16 // RememberDigest associates a digest with a repository. -func (c digestToRepositoryCache) RememberDigest(dgst digest.Digest, repo string) { +func (c digestToRepositoryCache) RememberDigest(dgst digest.Digest, ttl time.Duration, repo string) { key := dgst.String() value, ok := c.Get(key) if !ok { value = &repositoryBucket{} - if ok, _ := c.ContainsOrAdd(key, value); !ok { - return + if ok, _ := c.ContainsOrAdd(key, value); ok { + // the value exists now, get it + value, ok = c.Get(key) + if !ok { + // should not happen + return + } } } repos := value.(*repositoryBucket) - repos.Add(repo) + repos.Add(ttl, repo) +} + +// ForgetDigest removes an association between given digest and repository from the cache. +func (c digestToRepositoryCache) ForgetDigest(dgst digest.Digest, repo string) { + key := dgst.String() + value, ok := c.Get(key) + if !ok { + return + } + repos := value.(*repositoryBucket) + repos.Remove(repo) } // RepositoriesForDigest returns a list of repositories that may contain this digest. @@ -52,42 +69,123 @@ func (c digestToRepositoryCache) RepositoriesForDigest(dgst digest.Digest) []str return repos.Copy() } +// repositoryBucket contains a list of repositories with eviction timeouts. type repositoryBucket struct { mu sync.Mutex - list []string + list []bucketEntry } // Has returns true if the bucket contains this repository. -func (i *repositoryBucket) Has(repo string) bool { - i.mu.Lock() - defer i.mu.Unlock() - for _, s := range i.list { - if s == repo { - return true +func (b *repositoryBucket) Has(repo string) bool { + b.mu.Lock() + defer b.mu.Unlock() + + b.evictStale(time.Now()) + + return b.getIndexOf(repo) >= 0 +} + +// Add one or more repositories to this bucket. +func (b *repositoryBucket) Add(ttl time.Duration, repos ...string) { + b.mu.Lock() + defer b.mu.Unlock() + + now := time.Now() + b.evictStale(now) + evictOn := now.Add(ttl) + + for _, repo := range repos { + index := b.getIndexOf(repo) + arr := b.list + + if index >= 0 { + // repository already exists, move it to the end with highest eviction time + entry := arr[index] + copy(arr[index:], arr[index+1:]) + if entry.evictOn.Before(evictOn) { + entry.evictOn = evictOn + } + arr[len(arr)-1] = entry + + } else { + // repo is a new entry + if len(arr) >= bucketSize { + arr = arr[1:] + } + arr = append(arr, bucketEntry{ + repository: repo, + evictOn: evictOn, + }) } + + b.list = arr } - return false } -// Add one or more repositories to this bucket. -func (i *repositoryBucket) Add(repos ...string) { - i.mu.Lock() - defer i.mu.Unlock() - arr := i.list +// Remove removes all the given repos from repository bucket. +func (b *repositoryBucket) Remove(repos ...string) { + b.mu.Lock() + defer b.mu.Unlock() + for _, repo := range repos { - if len(arr) >= bucketSize { - arr = arr[1:] + index := b.getIndexOf(repo) + + if index >= 0 { + copy(b.list[index:], b.list[index+1:]) + b.list = b.list[:len(b.list)-1] + } + } +} + +// getIndexOf returns an index of given repository in bucket's array. If not found, -1 will be returned. +func (b *repositoryBucket) getIndexOf(repo string) int { + arr := b.list + + for i := 0; i < len(arr); i++ { + if arr[i].repository == repo { + return i + } + } + + return -1 +} + +// e. +func (b *repositoryBucket) evictStale(now time.Time) { + arr := b.list + j := 0 + + for i := 0; i < len(arr); i++ { + if arr[i].evictOn.Before(now) { + continue + } + if i > j { + arr[j] = arr[i] } - arr = append(arr, repo) + j++ + } + + if j < len(arr) { + b.list = arr[0:j] } - i.list = arr } // Copy returns a copy of the contents of this bucket in a threadsafe fasion. -func (i *repositoryBucket) Copy() []string { - i.mu.Lock() - defer i.mu.Unlock() - out := make([]string, len(i.list)) - copy(out, i.list) +func (b *repositoryBucket) Copy() []string { + b.mu.Lock() + defer b.mu.Unlock() + + b.evictStale(time.Now()) + + out := make([]string, len(b.list)) + for i, e := range b.list { + out[i] = e.repository + } return out } + +// bucketEntry holds a repository name with eviction timeout. +type bucketEntry struct { + repository string + evictOn time.Time +} diff --git a/pkg/dockerregistry/server/digestcache_test.go b/pkg/dockerregistry/server/digestcache_test.go new file mode 100644 index 000000000000..263e4cb8d0bc --- /dev/null +++ b/pkg/dockerregistry/server/digestcache_test.go @@ -0,0 +1,556 @@ +package server + +import ( + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/kubernetes/pkg/util/diff" +) + +const ( + allowedDeviation = time.Millisecond * 10 + + ttl1m = time.Minute + ttl5m = time.Minute * 5 + ttl8m = time.Minute * 8 +) + +var stale = time.Unix(0, 0) + +func TestRepositoryBucketAdd(t *testing.T) { + now := time.Now() + + generated := make([]bucketEntry, bucketSize) + for i := 0; i < bucketSize; i++ { + generated[i] = bucketEntry{ + repository: fmt.Sprintf("gen%d", i), + evictOn: now.Add(ttl5m), + } + } + + for _, tc := range []struct { + name string + ttl time.Duration + repos []string + entries []bucketEntry + expectedEntries []bucketEntry + }{ + { + name: "no existing entries", + ttl: ttl5m, + repos: []string{"a", "b"}, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "no entries to add", + ttl: ttl5m, + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "add few new entries", + ttl: ttl8m, + repos: []string{"bmw", "audi"}, + entries: []bucketEntry{ + { + repository: "skoda", + evictOn: now.Add(ttl5m), + }, + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "skoda", + evictOn: now.Add(ttl5m), + }, + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + { + repository: "bmw", + evictOn: now.Add(ttl8m), + }, + { + repository: "audi", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "add existing entry with higher ttl", + ttl: ttl8m, + repos: []string{"apple"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + { + repository: "apple", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "add existing entry with lower ttl", + ttl: ttl5m, + repos: []string{"orange"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "add new entry with eviction", + ttl: ttl5m, + repos: []string{"banana"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: stale, + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + { + repository: "banana", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "all stale", + ttl: ttl5m, + repos: []string{"banana"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: stale, + }, + { + repository: "apple", + evictOn: stale, + }, + { + repository: "pear", + evictOn: stale, + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "banana", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "add multiple entries with middle ttl", + ttl: ttl5m, + repos: []string{"apple", "banana", "peach", "orange"}, + entries: []bucketEntry{ + { + repository: "melon", + evictOn: stale, + }, + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl1m), + }, + { + repository: "plum", + evictOn: stale, + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "pear", + evictOn: now.Add(ttl1m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "banana", + evictOn: now.Add(ttl5m), + }, + { + repository: "peach", + evictOn: now.Add(ttl5m), + }, + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + }, + }, + + { + name: "over bucket size", + ttl: ttl1m, + repos: []string{"new1", generated[2].repository, "new2", generated[4].repository}, + entries: generated, + expectedEntries: append( + append([]bucketEntry{generated[3]}, generated[5:bucketSize]...), + bucketEntry{repository: "new1", evictOn: now.Add(ttl1m)}, + generated[2], + bucketEntry{repository: "new2", evictOn: now.Add(ttl1m)}, + generated[4]), + }, + } { + b := repositoryBucket{ + list: tc.entries, + } + b.Add(tc.ttl, tc.repos...) + + if len(b.list) != len(tc.expectedEntries) { + t.Errorf("[%s] got unexpected number of entries in bucket: %d != %d", tc.name, len(b.list), len(tc.expectedEntries)) + } + for i := 0; i < len(b.list); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v", tc.name, i, b.list[i]) + continue + } + a, b := b.list[i], tc.expectedEntries[i] + if !bucketEntriesEqual(a, b) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v != %#+v", tc.name, i, a, b) + } + } + for i := len(b.list); i < len(tc.expectedEntries); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d missing expected entry %#+v", tc.name, i, tc.expectedEntries[i]) + } + } + } +} + +func TestRepositoryBucketRemove(t *testing.T) { + now := time.Now() + + for _, tc := range []struct { + name string + repos []string + entries []bucketEntry + expectedEntries []bucketEntry + }{ + { + name: "no existing entries", + repos: []string{"a", "b"}, + }, + + { + name: "no matching entries", + repos: []string{"c", "d"}, + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "no entries to remove", + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "remove one matching", + repos: []string{"bmw", "skoda"}, + entries: []bucketEntry{ + { + repository: "skoda", + evictOn: now.Add(ttl5m), + }, + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "ford", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "remove, no eviction", + repos: []string{"pear"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: stale, + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "orange", + evictOn: stale, + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + }, + }, + + { + name: "remove multiple matching", + repos: []string{"orange", "apple"}, + entries: []bucketEntry{ + { + repository: "orange", + evictOn: now.Add(ttl8m), + }, + { + repository: "apple", + evictOn: now.Add(ttl5m), + }, + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + expectedEntries: []bucketEntry{ + { + repository: "pear", + evictOn: now.Add(ttl5m), + }, + }, + }, + } { + b := repositoryBucket{ + list: tc.entries, + } + b.Remove(tc.repos...) + + if len(b.list) != len(tc.expectedEntries) { + t.Errorf("[%s] got unexpected number of entries in bucket: %d != %d", tc.name, len(b.list), len(tc.expectedEntries)) + } + for i := 0; i < len(b.list); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v", tc.name, i, b.list[i]) + continue + } + a, b := b.list[i], tc.expectedEntries[i] + if !bucketEntriesEqual(a, b) { + t.Errorf("[%s] index=%d got unexpected entry: %#+v != %#+v", tc.name, i, a, b) + } + } + for i := len(b.list); i < len(tc.expectedEntries); i++ { + if i >= len(tc.expectedEntries) { + t.Errorf("[%s] index=%d missing expected entry %#+v", tc.name, i, tc.expectedEntries[i]) + } + } + } +} + +func TestRepositoryBucketCopy(t *testing.T) { + stale := time.Unix(0, 0) + now := time.Now() + + ttl5m := time.Minute * 5 + for _, tc := range []struct { + name string + entries []bucketEntry + expectedRepos []string + }{ + { + name: "no entry", + expectedRepos: []string{}, + }, + + { + name: "one stale entry", + entries: []bucketEntry{ + { + repository: "1", + evictOn: stale, + }, + }, + expectedRepos: []string{}, + }, + + { + name: "two entries", + entries: []bucketEntry{ + { + repository: "a", + evictOn: now.Add(ttl5m), + }, + { + repository: "b", + evictOn: now.Add(ttl5m), + }, + }, + expectedRepos: []string{"a", "b"}, + }, + } { + b := repositoryBucket{ + list: tc.entries, + } + result := b.Copy() + + if !reflect.DeepEqual(result, tc.expectedRepos) { + t.Errorf("[%s] got unexpected repo list: %s", tc.name, diff.ObjectGoPrintDiff(result, tc.expectedRepos)) + } + } +} + +func bucketEntriesEqual(a, b bucketEntry) bool { + if a.repository != b.repository { + return false + } + if a.evictOn.After(b.evictOn.Add(allowedDeviation)) || b.evictOn.After(a.evictOn.Add(allowedDeviation)) { + return false + } + return true +} diff --git a/pkg/dockerregistry/server/pullthroughblobstore.go b/pkg/dockerregistry/server/pullthroughblobstore.go index 75b5778e762e..c54d4b072d54 100644 --- a/pkg/dockerregistry/server/pullthroughblobstore.go +++ b/pkg/dockerregistry/server/pullthroughblobstore.go @@ -160,7 +160,7 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear if err != nil { continue } - r.repo.cachedLayers.RememberDigest(dgst, repo) + r.repo.cachedLayers.RememberDigest(dgst, r.repo.blobrepositorycachettl, repo) context.GetLogger(r.repo.ctx).Infof("Found digest location by search %q in %q: %v", dgst, repo, err) return desc, nil } diff --git a/pkg/dockerregistry/server/repositorymiddleware.go b/pkg/dockerregistry/server/repositorymiddleware.go index 7ce25ce17e0c..73db29c326b6 100644 --- a/pkg/dockerregistry/server/repositorymiddleware.go +++ b/pkg/dockerregistry/server/repositorymiddleware.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "strings" + "time" "github.com/docker/distribution" "github.com/docker/distribution/context" @@ -29,6 +30,8 @@ import ( ) const ( + // Environment variables + // DockerRegistryURLEnvVar is a mandatory environment variable name specifying url of internal docker // registry. All references to pushed images will be prefixed with its value. DockerRegistryURLEnvVar = "DOCKER_REGISTRY_URL" @@ -46,6 +49,11 @@ const ( // AcceptSchema2EnvVar is a boolean environment variable that allows to accept manifest schema v2 // on manifest put requests. AcceptSchema2EnvVar = "REGISTRY_MIDDLEWARE_REPOSITORY_OPENSHIFT_ACCEPTSCHEMA2" + + // Default values + + defaultDigestToRepositoryCacheSize = 2048 + defaultBlobRepositoryCacheTTL = time.Minute * 10 ) var ( @@ -64,7 +72,7 @@ var ( ) func init() { - cache, err := newDigestToRepositoryCache(2048) + cache, err := newDigestToRepositoryCache(defaultDigestToRepositoryCacheSize) if err != nil { panic(err) } @@ -118,6 +126,8 @@ type repository struct { pullthrough bool // acceptschema2 allows to refuse the manifest schema version 2 acceptschema2 bool + // blobrepositorycachettl is an eviction timeout for entries of cachedLayers + blobrepositorycachettl time.Duration // cachedLayers remembers a mapping of layer digest to repositories recently seen with that image to avoid // having to check every potential upstream repository when a blob request is made. The cache is useful only // when session affinity is on for the registry, but in practice the first pull will fill the cache. @@ -158,16 +168,17 @@ func newRepositoryWithClient( return &repository{ Repository: repo, - ctx: ctx, - quotaClient: quotaClient, - limitClient: limitClient, - registryOSClient: registryOSClient, - registryAddr: registryAddr, - namespace: nameParts[0], - name: nameParts[1], - pullthrough: pullthrough, - acceptschema2: acceptschema2, - cachedLayers: cachedLayers, + ctx: ctx, + quotaClient: quotaClient, + limitClient: limitClient, + registryOSClient: registryOSClient, + registryAddr: registryAddr, + namespace: nameParts[0], + name: nameParts[1], + acceptschema2: acceptschema2, + blobrepositorycachettl: defaultBlobRepositoryCacheTTL, + pullthrough: pullthrough, + cachedLayers: cachedLayers, }, nil } @@ -522,7 +533,7 @@ func (r *repository) rememberLayersOfImage(image *imageapi.Image, cacheName stri if len(image.DockerImageLayers) > 0 { for _, layer := range image.DockerImageLayers { - r.cachedLayers.RememberDigest(digest.Digest(layer.Name), cacheName) + r.cachedLayers.RememberDigest(digest.Digest(layer.Name), r.blobrepositorycachettl, cacheName) } return } @@ -538,7 +549,7 @@ func (r *repository) rememberLayersOfImage(image *imageapi.Image, cacheName stri func (r *repository) rememberLayersOfManifest(manifest distribution.Manifest, cacheName string) { // remember the layers in the cache as an optimization to avoid searching all remote repositories for _, layer := range manifest.References() { - r.cachedLayers.RememberDigest(layer.Digest, cacheName) + r.cachedLayers.RememberDigest(layer.Digest, r.blobrepositorycachettl, cacheName) } } From 19f594b7e2c2830eaaed6acf9c45e34615c00238 Mon Sep 17 00:00:00 2001 From: Michal Minar Date: Sat, 16 Jul 2016 16:32:12 +0200 Subject: [PATCH 07/12] Configurable blobrepositorycachettl value Turned blob repository ttl into a config option. Also allowed for overrides using env var REGISTRY_MIDDLEWARE_REPOSITORY_OPENSHIFT_BLOBREPOSITORYCACHETTL. Signed-off-by: Michal Minar --- images/dockerregistry/config.yml | 1 + .../server/quotarestrictedblobstore.go | 39 +-- .../server/repositorymiddleware.go | 29 ++- pkg/dockerregistry/server/util.go | 76 +++++- pkg/dockerregistry/server/util_test.go | 233 ++++++++++++++++++ 5 files changed, 330 insertions(+), 48 deletions(-) create mode 100644 pkg/dockerregistry/server/util_test.go diff --git a/images/dockerregistry/config.yml b/images/dockerregistry/config.yml index 90b3dfdc8174..4e1ef8b048ce 100644 --- a/images/dockerregistry/config.yml +++ b/images/dockerregistry/config.yml @@ -23,5 +23,6 @@ middleware: pullthrough: true enforcequota: false projectcachettl: 1m + blobrepositorycachettl: 10m storage: - name: openshift diff --git a/pkg/dockerregistry/server/quotarestrictedblobstore.go b/pkg/dockerregistry/server/quotarestrictedblobstore.go index 5173e3259d8c..6db162c777cc 100644 --- a/pkg/dockerregistry/server/quotarestrictedblobstore.go +++ b/pkg/dockerregistry/server/quotarestrictedblobstore.go @@ -13,7 +13,6 @@ package server import ( - "fmt" "time" "github.com/Sirupsen/logrus" @@ -33,32 +32,11 @@ const ( // timeout. Caches will only be initialized if the given ttl is positive. Options are gathered from // configuration file and will be overriden by enforceQuota and projectCacheTTL environment variable values. func newQuotaEnforcingConfig(ctx context.Context, enforceQuota, projectCacheTTL string, options map[string]interface{}) *quotaEnforcingConfig { - buildOptionValues := func(optionName string, override string) []string { - optValues := []string{} - if value, ok := options[optionName]; ok { - var res string - switch v := value.(type) { - case string: - res = v - case bool: - res = fmt.Sprintf("%t", v) - default: - res = fmt.Sprintf("%v", v) - } - if len(res) > 0 { - optValues = append(optValues, res) - } - } - if len(override) > 0 { - optValues = append(optValues, override) - } - return optValues + enforce, err := getBoolOption(EnforceQuotaEnvVar, "enforcequota", false, options) + if err != nil { + logrus.Error(err) } - enforce := false - for _, s := range buildOptionValues("enforcequota", enforceQuota) { - enforce = s == "true" - } if !enforce { context.GetLogger(ctx).Info("quota enforcement disabled") return "aEnforcingConfig{ @@ -67,14 +45,9 @@ func newQuotaEnforcingConfig(ctx context.Context, enforceQuota, projectCacheTTL } } - ttl := defaultProjectCacheTTL - for _, s := range buildOptionValues("projectcachettl", projectCacheTTL) { - parsed, err := time.ParseDuration(s) - if err != nil { - logrus.Errorf("failed to parse project cache ttl %q: %v", s, err) - continue - } - ttl = parsed + ttl, err := getDurationOption(ProjectCacheTTLEnvVar, "projectcachettl", defaultProjectCacheTTL, options) + if err != nil { + logrus.Error(err) } if ttl <= 0 { diff --git a/pkg/dockerregistry/server/repositorymiddleware.go b/pkg/dockerregistry/server/repositorymiddleware.go index 73db29c326b6..65260ae68ba4 100644 --- a/pkg/dockerregistry/server/repositorymiddleware.go +++ b/pkg/dockerregistry/server/repositorymiddleware.go @@ -50,6 +50,11 @@ const ( // on manifest put requests. AcceptSchema2EnvVar = "REGISTRY_MIDDLEWARE_REPOSITORY_OPENSHIFT_ACCEPTSCHEMA2" + // BlobRepositoryCacheTTLEnvVar is an environment variable specifying an eviction timeout for entries. The higher the value, the faster queries but also a higher risk of + // leaking a blob that is no longer tagged in given repository. + BlobRepositoryCacheTTLEnvVar = "REGISTRY_MIDDLEWARE_REPOSITORY_OPENSHIFT_BLOBREPOSITORYCACHETTL" + // Default values defaultDigestToRepositoryCacheSize = 2048 @@ -97,6 +102,7 @@ func init() { if quotaEnforcing == nil { quotaEnforcing = newQuotaEnforcingConfig(ctx, os.Getenv(EnforceQuotaEnvVar), os.Getenv(ProjectCacheTTLEnvVar), options) } + return newRepositoryWithClient(registryOSClient, kClient, kClient, ctx, repo, options) }, ) @@ -150,14 +156,17 @@ func newRepositoryWithClient( return nil, fmt.Errorf("%s is required", DockerRegistryURLEnvVar) } - pullthrough := getBoolOption("pullthrough", false, options) - - acceptschema2 := false - - if os.Getenv(AcceptSchema2EnvVar) != "" { - acceptschema2 = os.Getenv(AcceptSchema2EnvVar) == "true" - } else { - acceptschema2 = getBoolOption("acceptschema2", false, options) + acceptschema2, err := getBoolOption(AcceptSchema2EnvVar, "acceptschema2", false, options) + if err != nil { + context.GetLogger(ctx).Error(err) + } + blobrepositorycachettl, err := getDurationOption(BlobRepositoryCacheTTLEnvVar, "blobrepositorycachettl", defaultBlobRepositoryCacheTTL, options) + if err != nil { + context.GetLogger(ctx).Error(err) + } + pullthrough, err := getBoolOption("", "pullthrough", false, options) + if err != nil { + context.GetLogger(ctx).Error(err) } nameParts := strings.SplitN(repo.Named().Name(), "/", 2) @@ -165,6 +174,8 @@ func newRepositoryWithClient( return nil, fmt.Errorf("invalid repository name %q: it must be of the format /", repo.Named().Name()) } + context.GetLogger(ctx).Debugf(`making openshift repository [is=%q, acceptschema2=%t, pullthrough=%t]`, repo.Named().Name(), acceptschema2, pullthrough) + return &repository{ Repository: repo, @@ -176,7 +187,7 @@ func newRepositoryWithClient( namespace: nameParts[0], name: nameParts[1], acceptschema2: acceptschema2, - blobrepositorycachettl: defaultBlobRepositoryCacheTTL, + blobrepositorycachettl: blobrepositorycachettl, pullthrough: pullthrough, cachedLayers: cachedLayers, }, nil diff --git a/pkg/dockerregistry/server/util.go b/pkg/dockerregistry/server/util.go index 68082150df17..9f5fdfd3e585 100644 --- a/pkg/dockerregistry/server/util.go +++ b/pkg/dockerregistry/server/util.go @@ -2,7 +2,10 @@ package server import ( "encoding/json" + "fmt" + "os" "strings" + "time" "github.com/docker/distribution" "github.com/docker/distribution/context" @@ -25,14 +28,75 @@ func RepositoryFrom(ctx context.Context) (repo *repository, found bool) { return } -func getBoolOption(name string, defval bool, options map[string]interface{}) bool { - if value, ok := options[name]; ok { - var b bool - if b, ok = value.(bool); ok { - return b +func getOptionValue( + envVar string, + optionName string, + defval interface{}, + options map[string]interface{}, + conversionFunc func(v interface{}) (interface{}, error), +) (value interface{}, err error) { + value = defval + if optValue, ok := options[optionName]; ok { + converted, convErr := conversionFunc(optValue) + if convErr != nil { + err = fmt.Errorf("config option %q: invalid value: %v", optionName, convErr) + } else { + value = converted } } - return defval + + if len(envVar) == 0 { + return + } + envValue := os.Getenv(envVar) + if len(envValue) == 0 { + return + } + + converted, convErr := conversionFunc(envValue) + if convErr != nil { + err = fmt.Errorf("invalid value of environment variable %s: %v", envVar, convErr) + } else { + value = converted + } + + return +} + +func getBoolOption(envVar string, optionName string, defval bool, options map[string]interface{}) (bool, error) { + value, err := getOptionValue(envVar, optionName, defval, options, func(value interface{}) (b interface{}, err error) { + switch t := value.(type) { + case bool: + return t, nil + case string: + switch strings.ToLower(t) { + case "true": + return true, nil + case "false": + return false, nil + } + } + return defval, fmt.Errorf("%#+v is not a boolean", value) + }) + + return value.(bool), err +} + +func getDurationOption(envVar string, optionName string, defval time.Duration, options map[string]interface{}) (time.Duration, error) { + value, err := getOptionValue(envVar, optionName, defval, options, func(value interface{}) (d interface{}, err error) { + s, ok := value.(string) + if !ok { + return defval, fmt.Errorf("expected string, not %T", value) + } + + parsed, err := time.ParseDuration(s) + if err != nil { + return defval, fmt.Errorf("parse duration error: %v", err) + } + return parsed, nil + }) + + return value.(time.Duration), err } // deserializedManifestFromImage converts an Image to a DeserializedManifest. diff --git a/pkg/dockerregistry/server/util_test.go b/pkg/dockerregistry/server/util_test.go new file mode 100644 index 000000000000..79bd8f9d4485 --- /dev/null +++ b/pkg/dockerregistry/server/util_test.go @@ -0,0 +1,233 @@ +package server + +import ( + "os" + "testing" + "time" +) + +func TestGetBoolOption(t *testing.T) { + for _, tc := range []struct { + name string + envName string + exportEnv map[string]string + option string + options map[string]interface{} + defaultValue bool + expected bool + expectedError bool + }{ + { + name: "default to false", + defaultValue: false, + option: "opt", + expected: false, + }, + + { + name: "default to true", + defaultValue: true, + option: "opt", + envName: "VBOOL", + expected: true, + }, + + { + name: "given option", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "true"}, + expected: true, + }, + + { + name: "env value with missing option", + defaultValue: true, + option: "opt", + envName: "VBOOL", + exportEnv: map[string]string{"VBOOL": "false"}, + expected: false, + }, + + { + name: "given option and env var", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "false"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "true"}, + expected: true, + }, + + { + name: "disable with env var", + defaultValue: true, + option: "opt", + options: map[string]interface{}{"opt": "true"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "false"}, + expected: false, + }, + + { + name: "given option with bad env value", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "true"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "falsed"}, + expected: true, + expectedError: true, + }, + + { + name: "env value with wrong option type", + defaultValue: true, + option: "opt", + options: map[string]interface{}{"opt": 1}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "true"}, + expected: true, + expectedError: true, + }, + + { + name: "env value with bad option value", + defaultValue: true, + option: "opt", + options: map[string]interface{}{"opt": "falsed"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "false"}, + expected: false, + expectedError: true, + }, + + { + name: "bad env value with bad option value", + defaultValue: false, + option: "opt", + options: map[string]interface{}{"opt": "turk"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "truth"}, + expected: false, + expectedError: true, + }, + } { + for key, value := range tc.exportEnv { + os.Setenv(key, value) + } + d, err := getBoolOption(tc.envName, tc.option, tc.defaultValue, tc.options) + if err == nil && tc.expectedError { + t.Errorf("[%s] unexpected non-error", tc.name) + } else if err != nil && !tc.expectedError { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + } + if d != tc.expected { + t.Errorf("[%s] got unexpected duration: %t != %t", tc.name, d, tc.expected) + } + } +} + +func TestGetDurationOption(t *testing.T) { + for _, tc := range []struct { + name string + envName string + exportEnv map[string]string + option string + options map[string]interface{} + defaultValue time.Duration + expectedDuration time.Duration + expectedError bool + }{ + { + name: "no option, no env", + defaultValue: time.Minute, + option: "opt", + expectedDuration: time.Minute, + }, + + { + name: "given option", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "4000ns"}, + expectedDuration: 4000, + }, + + { + name: "env value with missing option", + defaultValue: time.Minute, + option: "opt", + envName: "VAR", + exportEnv: map[string]string{"VAR": "1s"}, + expectedDuration: time.Second, + }, + + { + name: "given option and env var", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "4000us"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "1s"}, + expectedDuration: time.Second, + }, + + { + name: "given option with bad env value", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "1s"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "bad"}, + expectedDuration: time.Second, + expectedError: true, + }, + + { + name: "env value with wrong option type", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": false}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "2s"}, + expectedDuration: time.Second * 2, + expectedError: true, + }, + + { + name: "env value with bad option value", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "bad"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "2s"}, + expectedDuration: time.Second * 2, + expectedError: true, + }, + + { + name: "bad env value with bad option value", + defaultValue: time.Minute, + option: "opt", + options: map[string]interface{}{"opt": "bad"}, + envName: "VAR", + exportEnv: map[string]string{"VAR": "bad"}, + expectedDuration: time.Minute, + expectedError: true, + }, + } { + for key, value := range tc.exportEnv { + os.Setenv(key, value) + } + d, err := getDurationOption(tc.envName, tc.option, tc.defaultValue, tc.options) + if err == nil && tc.expectedError { + t.Errorf("[%s] unexpected non-error", tc.name) + } else if err != nil && !tc.expectedError { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + } + if d != tc.expectedDuration { + t.Errorf("[%s] got unexpected duration: %s != %s", tc.name, d.String(), tc.expectedDuration.String()) + } + } +} From fb1b536ae9ca950f967a41d5f987f1b01a76b498 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Wed, 13 Jul 2016 13:34:42 +0200 Subject: [PATCH 08/12] e2e: speed-up docker repository pull tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use much smaller image for pulls. Also Deal with multiple image candidates for deletion. Signed-off-by: Michal Minář --- test/end-to-end/core.sh | 72 ++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/test/end-to-end/core.sh b/test/end-to-end/core.sh index 254d1d947073..ef4ee6cc92eb 100755 --- a/test/end-to-end/core.sh +++ b/test/end-to-end/core.sh @@ -44,6 +44,14 @@ function wait_for_app() { os::cmd::try_until_text "curl -s http://${FRONTEND_IP}:5432/keys/foo" "1337" } +function remove_docker_images() { + local name="$1" + local tag="${2:-\S\+}" + local imageids=$(docker images | sed -n "s,^.*$name\s\+$tag\s\+\(\S\+\).*,\1,p" | sort -u | tr '\n' ' ') + os::cmd::expect_success_and_text "echo '${imageids}' | wc -w" '^[1-9][0-9]*$' + os::cmd::expect_success "docker rmi -f ${imageids}" +} + os::test::junit::declare_suite_start "end-to-end/core" echo "[INFO] openshift version: `openshift version`" @@ -133,51 +141,57 @@ echo "[INFO] Docker pullthrough" os::cmd::expect_success "oc import-image --confirm --from=mysql:latest mysql:pullthrough" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/mysql:pullthrough" -echo "[INFO] Docker start with GCS" +echo "[INFO] Docker registry start with GCS" os::cmd::expect_failure_and_text "docker run -e REGISTRY_STORAGE=\"gcs: {}\" openshift/origin-docker-registry:${TAG}" "No bucket parameter provided" +echo "[INFO] Docker pull from istag" +os::cmd::expect_success "oc import-image --confirm --from=hello-world:latest -n test hello-world:pullthrough" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/test/hello-world:pullthrough" +os::cmd::expect_success "docker tag ${DOCKER_REGISTRY}/test/hello-world:pullthrough ${DOCKER_REGISTRY}/cache/hello-world:latest" +os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/cache/hello-world:latest" + # verify we can pull from tagged image (using tag) -imageid=$(docker images | grep centos/ruby-22-centos7 | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7:latest to the same image stream and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7:latest ruby-22-centos7:new-tag" -os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/ruby-22-centos7:new-tag" +remove_docker_images 'cache/hello-world' +echo "[INFO] Tagging hello-world:latest to the same image stream and pulling it" +os::cmd::expect_success "oc tag hello-world:latest hello-world:new-tag" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/hello-world:new-tag" echo "[INFO] The same image stream pull successful" -imageid=$(docker images | grep cache/ruby-22-centos7 | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7:latest to cross repository and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7:latest cross:repo-pull" +remove_docker_images "${DOCKER_REGISTRY}/cache/hello-world" new-tag +echo "[INFO] Tagging hello-world:latest to cross repository and pulling it" +os::cmd::expect_success "oc tag hello-world:latest cross:repo-pull" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/cross:repo-pull" echo "[INFO] Cross repository pull successful" -imageid=$(docker images | grep cache/cross | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7:latest to cross namespace and pulling it" -os::cmd::expect_success "oc tag cache/ruby-22-centos7:latest cross:namespace-pull -n custom" +remove_docker_images "${DOCKER_REGISTRY}/cache/cross" "repo-pull" +echo "[INFO] Tagging hello-world:latest to cross namespace and pulling it" +os::cmd::expect_success "oc tag cache/hello-world:latest cross:namespace-pull -n custom" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/custom/cross:namespace-pull" echo "[INFO] Cross namespace pull successful" -# verify we can pull from tagged image (using imageid) -imageid=$(docker images | grep custom/cross | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -tagid=$(oc get istag ruby-22-centos7:latest --template={{.image.metadata.name}}) -echo "[INFO] Tagging ruby-22-centos7@${tagid} to the same image stream and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7@${tagid} ruby-22-centos7:new-id-tag" -os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/ruby-22-centos7:new-id-tag" +# verify we can pull from tagged image (using image digest) +remove_docker_images "${DOCKER_REGISTRY}/custom/cross" namespace-pull +imagedigest=$(oc get istag hello-world:latest --template={{.image.metadata.name}}) +echo "[INFO] Tagging hello-world@${imagedigest} to the same image stream and pulling it" +os::cmd::expect_success "oc tag hello-world@${imagedigest} hello-world:new-id-tag" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/hello-world:new-id-tag" echo "[INFO] The same image stream pull successful" -imageid=$(docker images | grep cache/ruby-22-centos7 | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7@${tagid} to cross repository and pulling it" -os::cmd::expect_success "oc tag ruby-22-centos7@${tagid} cross:repo-pull-id" +remove_docker_images "${DOCKER_REGISTRY}/cache/hello-world" new-id-tag +echo "[INFO] Tagging hello-world@${imagedigest} to cross repository and pulling it" +os::cmd::expect_success "oc tag hello-world@${imagedigest} cross:repo-pull-id" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/cross:repo-pull-id" echo "[INFO] Cross repository pull successful" -imageid=$(docker images | grep cache/cross | awk '{ print $3 }') -os::cmd::expect_success "docker rmi -f ${imageid}" -echo "[INFO] Tagging ruby-22-centos7@${tagid} to cross namespace and pulling it" -os::cmd::expect_success "oc tag cache/ruby-22-centos7@${tagid} cross:namespace-pull-id -n custom" +remove_docker_images "${DOCKER_REGISTRY}/cache/cross" repo-pull-id +echo "[INFO] Tagging hello-world@${imagedigest} to cross repository and pulling it by id" +os::cmd::expect_success "oc tag hello-world@${imagedigest} cross:repo-pull-id" +os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/cache/cross@${imagedigest}" +echo "[INFO] Cross repository pull successful" + +remove_docker_images "${DOCKER_REGISTRY}/cache/cross" +echo "[INFO] Tagging hello-world@${imagedigest} to cross namespace and pulling it" +os::cmd::expect_success "oc tag cache/hello-world@${imagedigest} cross:namespace-pull-id -n custom" os::cmd::expect_success "docker pull ${DOCKER_REGISTRY}/custom/cross:namespace-pull-id" echo "[INFO] Cross namespace pull successful" From ca97574e6fb55aa079118878b1794b99f7ba072d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Thu, 14 Jul 2016 11:43:15 +0200 Subject: [PATCH 09/12] e2e: added tests for cross-repo mounting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michal Minář --- test/end-to-end/core.sh | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/test/end-to-end/core.sh b/test/end-to-end/core.sh index ef4ee6cc92eb..7fc13efa4224 100755 --- a/test/end-to-end/core.sh +++ b/test/end-to-end/core.sh @@ -136,6 +136,11 @@ os::cmd::expect_success "docker tag -f centos/ruby-22-centos7:latest ${DOCKER_RE os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/cache/ruby-22-centos7:latest" echo "[INFO] Pushed ruby-22-centos7" +# get image's digest +rubyimagedigest=$(oc get -o jsonpath='{.status.tags[?(@.tag=="latest")].items[0].image}' is/ruby-22-centos7) +# get a random, non-empty blob +rubyimageblob=$(oc get isimage -o go-template='{{range .image.dockerImageLayers}}{{if gt .size 1024.}}{{.name}},{{end}}{{end}}' ruby-22-centos7@${rubyimagedigest} | cut -d , -f 1) + # verify remote images can be pulled directly from the local registry echo "[INFO] Docker pullthrough" os::cmd::expect_success "oc import-image --confirm --from=mysql:latest mysql:pullthrough" @@ -205,7 +210,7 @@ echo "[INFO] Docker login as pusher to ${DOCKER_REGISTRY}" os::cmd::expect_success "docker login -u e2e-user -p ${pusher_token} -e pusher@openshift.com ${DOCKER_REGISTRY}" echo "[INFO] Docker login successful" -# Test anonymous registry access +echo "[INFO] Anonymous registry access" # setup: log out of docker, log into openshift as e2e-user to run policy commands, tag image to use for push attempts os::cmd::expect_success 'oc login -u e2e-user' os::cmd::expect_success 'docker pull busybox' @@ -229,13 +234,39 @@ os::cmd::expect_success 'oc policy add-role-to-user system:image-pusher system:a os::cmd::try_until_text 'oc policy who-can update imagestreams/layers -n custom' 'system:anonymous' os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/custom/cross:namespace-pull" os::cmd::expect_success "docker push ${DOCKER_REGISTRY}/custom/cross:namespace-pull-id" +echo "[INFO] Anonymous registry access successfull" # log back into docker as e2e-user again os::cmd::expect_success "docker login -u e2e-user -p ${e2e_user_token} -e e2e-user@openshift.com ${DOCKER_REGISTRY}" +os::cmd::expect_success "oc new-project crossmount" +os::cmd::expect_success "oc create imagestream repo" + echo "[INFO] Back to 'default' project with 'admin' user..." os::cmd::expect_success "oc project ${CLUSTER_ADMIN_CONTEXT}" os::cmd::expect_success_and_text 'oc whoami' 'system:admin' +os::cmd::expect_success "oc tag --source docker centos/ruby-22-centos7:latest -n custom ruby-22-centos7:latest" +os::cmd::expect_success 'oc policy add-role-to-user registry-viewer pusher -n custom' +os::cmd::expect_success 'oc policy add-role-to-user system:image-pusher pusher -n crossmount' + +echo "[INFO] Docker cross-repo mount" +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "404 Not Found" +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/cache/ruby-22-centos7/blobs/$rubyimageblob'" "200 OK" +# 202 means that cross-repo mount has failed (in this case because of blob doesn't exist in the source repository), client needs to reupload the blob +os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/hello-world'" "202 Accepted" +# 201 means that blob has been cross mounted from given repository +os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/ruby-22-centos7'" "201 Created" +# check that the blob is linked now +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "200 OK" +# remove pusher's permissions to read from the source repository +os::cmd::expect_success "oc policy remove-role-from-user system:image-pusher pusher -n cache" +# cross-repo mount failed because of access denied +os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/ruby-22-centos7'" "202 Accepted" +# wait until image is imported +os::cmd::try_until_text "oc get -n custom is/ruby-22-centos7 --template='{{if .status.tags}}{{if gt (len .status.tags) 0}}tagged{{end}}{{end}}'" tagged $((20*TIME_SEC)) +# cross repo mount from a remote repository (cross-repo mount with pullthrough) +os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=custom/ruby-22-centos7'" "201 Created" +echo "[INFO] Docker cross-repo mount successful" # The build requires a dockercfg secret in the builder service account in order # to be able to push to the registry. Make sure it exists first. From ba11b23e86cf7d53155d788af627d7d461861a3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Wed, 27 Jul 2016 19:24:27 +0200 Subject: [PATCH 10/12] Allow to mock default registry client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michal Minář --- pkg/dockerregistry/server/auth.go | 22 ++++++++++++++++------ pkg/dockerregistry/server/token.go | 2 +- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/dockerregistry/server/auth.go b/pkg/dockerregistry/server/auth.go index 864656cf3f32..c63898811b9e 100644 --- a/pkg/dockerregistry/server/auth.go +++ b/pkg/dockerregistry/server/auth.go @@ -40,26 +40,36 @@ const ( TokenRealmKey = "token-realm" ) +// RegistryClient encapsulates getting access to the OpenShift API. +type RegistryClient interface { + // Clients return the authenticated client to use with the server. + Clients() (client.Interface, kclient.Interface, error) + // SafeClientConfig returns a client config without authentication info. + SafeClientConfig() restclient.Config +} + // DefaultRegistryClient is exposed for testing the registry with fake client. var DefaultRegistryClient = NewRegistryClient(clientcmd.NewConfig().BindToFile()) -// RegistryClient encapsulates getting access to the OpenShift API. -type RegistryClient struct { +// registryClient implements RegistryClient +type registryClient struct { config *clientcmd.Config } +var _ RegistryClient = ®istryClient{} + // NewRegistryClient creates a registry client. -func NewRegistryClient(config *clientcmd.Config) *RegistryClient { - return &RegistryClient{config: config} +func NewRegistryClient(config *clientcmd.Config) RegistryClient { + return ®istryClient{config: config} } // Client returns the authenticated client to use with the server. -func (r *RegistryClient) Clients() (client.Interface, kclient.Interface, error) { +func (r *registryClient) Clients() (client.Interface, kclient.Interface, error) { return r.config.Clients() } // SafeClientConfig returns a client config without authentication info. -func (r *RegistryClient) SafeClientConfig() restclient.Config { +func (r *registryClient) SafeClientConfig() restclient.Config { return clientcmd.AnonymousClientConfig(r.config.OpenShiftConfig()) } diff --git a/pkg/dockerregistry/server/token.go b/pkg/dockerregistry/server/token.go index 465f0da1cda0..c5dfb63ab94c 100644 --- a/pkg/dockerregistry/server/token.go +++ b/pkg/dockerregistry/server/token.go @@ -17,7 +17,7 @@ type tokenHandler struct { } // NewTokenHandler returns a handler that implements the docker token protocol -func NewTokenHandler(ctx context.Context, client *RegistryClient) http.Handler { +func NewTokenHandler(ctx context.Context, client RegistryClient) http.Handler { return &tokenHandler{ ctx: ctx, anonymousConfig: client.SafeClientConfig(), From 2a874eb9b1fbc556ab1ea28445e366673a4b4be9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Fri, 22 Jul 2016 11:30:16 +0200 Subject: [PATCH 11/12] Added unit tests for repository and blobdescriptorservice MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michal Minář --- .../server/blobdescriptorservice_test.go | 469 ++++++++++ .../server/repositorymiddleware_test.go | 851 ++++++++++++++++++ 2 files changed, 1320 insertions(+) create mode 100644 pkg/dockerregistry/server/blobdescriptorservice_test.go create mode 100644 pkg/dockerregistry/server/repositorymiddleware_test.go diff --git a/pkg/dockerregistry/server/blobdescriptorservice_test.go b/pkg/dockerregistry/server/blobdescriptorservice_test.go new file mode 100644 index 000000000000..486ceeb3d6ab --- /dev/null +++ b/pkg/dockerregistry/server/blobdescriptorservice_test.go @@ -0,0 +1,469 @@ +package server + +import ( + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "os" + "sync" + "testing" + + "github.com/docker/distribution" + "github.com/docker/distribution/configuration" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/api/errcode" + "github.com/docker/distribution/registry/api/v2" + registryauth "github.com/docker/distribution/registry/auth" + "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/handlers" + "github.com/docker/distribution/registry/middleware/registry" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/testutil" + + "k8s.io/kubernetes/pkg/client/restclient" + kclient "k8s.io/kubernetes/pkg/client/unversioned" + ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient" + + osclient "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/client/testclient" + imagetest "github.com/openshift/origin/pkg/image/admission/testutil" +) + +// TestBlobDescriptorServiceIsApplied ensures that blobDescriptorService middleware gets applied. +// It relies on the fact that blobDescriptorService requires higher levels to set repository object on given +// context. If the object isn't given, its method will err out. +func TestBlobDescriptorServiceIsApplied(t *testing.T) { + ctx := context.Background() + + // don't do any authorization check + installFakeAccessController(t) + m := fakeBlobDescriptorService(t) + // to make other unit tests working + defer m.changeUnsetRepository(false) + + testImage := newImageForManifest(t, "user/app", sampleImageManifestSchema1, true) + testImageStream := testNewImageStreamObject("user", "app", "latest", testImage.Name) + client := &testclient.Fake{} + client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, *testImageStream)) + client.AddReactor("get", "images", getFakeImageGetHandler(t, *testImage)) + + // TODO: get rid of those nasty global vars + backupRegistryClient := DefaultRegistryClient + DefaultRegistryClient = makeFakeRegistryClient(client, ktestclient.NewSimpleFake()) + defer func() { + // set it back once this test finishes to make other unit tests working + DefaultRegistryClient = backupRegistryClient + }() + + app := handlers.NewApp(ctx, &configuration.Configuration{ + Loglevel: "debug", + Auth: map[string]configuration.Parameters{ + fakeAuthorizerName: {"realm": fakeAuthorizerName}, + }, + Storage: configuration.Storage{ + "inmemory": configuration.Parameters{}, + "cache": configuration.Parameters{ + "blobdescriptor": "inmemory", + }, + "delete": configuration.Parameters{ + "enabled": true, + }, + }, + Middleware: map[string][]configuration.Middleware{ + "registry": {{Name: "openshift"}}, + "repository": {{Name: "openshift"}}, + "storage": {{Name: "openshift"}}, + }, + }) + server := httptest.NewServer(app) + router := v2.Router() + + serverURL, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + os.Setenv("DOCKER_REGISTRY_URL", serverURL.Host) + + desc := uploadTestBlob(t, serverURL, "user/app") + + for _, tc := range []struct { + name string + method string + endpoint string + vars []string + unsetRepository bool + expectedStatus int + expectedMethodInvocations map[string]int + }{ + { + name: "get blob with repository unset", + method: http.MethodGet, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "get blob", + method: http.MethodGet, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + expectedStatus: http.StatusOK, + expectedMethodInvocations: map[string]int{"Stat": 2}, + }, + + { + name: "stat blob with repository unset", + method: http.MethodHead, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "stat blob", + method: http.MethodHead, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + expectedStatus: http.StatusOK, + expectedMethodInvocations: map[string]int{"Stat": 3}, + }, + + { + name: "delete blob with repository unset", + method: http.MethodDelete, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "delete blob", + method: http.MethodDelete, + endpoint: v2.RouteNameBlob, + vars: []string{ + "name", "user/app", + "digest", desc.Digest.String(), + }, + expectedStatus: http.StatusAccepted, + expectedMethodInvocations: map[string]int{"Stat": 1, "Clear": 1}, + }, + + { + // this is expected to succeed because we don't check local links (the manifest is retrieved from + // etcd) + name: "get manifest with repository unset", + method: http.MethodGet, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", "latest", + }, + unsetRepository: true, + expectedStatus: http.StatusOK, + //expectedMethodInvocations: map[string]int{"Stat": 2}, + }, + + { + name: "get manifest", + method: http.MethodGet, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", "latest", + }, + expectedStatus: http.StatusOK, + //expectedMethodInvocations: map[string]int{"Stat": 0}, + }, + + { + // this is expected to succeed because we don't check local links (the manifest is retrieved from + // etcd) + name: "delete manifest with repository unset", + method: http.MethodDelete, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", testImage.Name, + }, + unsetRepository: true, + expectedStatus: http.StatusInternalServerError, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + + { + name: "delete manifest", + method: http.MethodDelete, + endpoint: v2.RouteNameManifest, + vars: []string{ + "name", "user/app", + "reference", testImage.Name, + }, + expectedStatus: http.StatusNotFound, + expectedMethodInvocations: map[string]int{"Stat": 1}, + }, + } { + m.clearStats() + m.changeUnsetRepository(tc.unsetRepository) + + route := router.GetRoute(tc.endpoint).Host(serverURL.Host) + u, err := route.URL(tc.vars...) + if err != nil { + t.Errorf("[%s] failed to build route: %v", tc.name, err) + continue + } + + req, err := http.NewRequest(tc.method, u.String(), nil) + if err != nil { + t.Errorf("[%s] failed to make request: %v", tc.name, err) + } + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Errorf("[%s] failed to do the request: %v", tc.name, err) + continue + } + defer resp.Body.Close() + + if resp.StatusCode != tc.expectedStatus { + t.Errorf("[%s] unexpected status code: %v != %v", tc.name, resp.StatusCode, tc.expectedStatus) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + content, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("[%s] failed to read body: %v", tc.name, err) + } else if len(content) > 0 { + errs := errcode.Errors{} + err := errs.UnmarshalJSON(content) + if err != nil { + t.Logf("[%s] failed to parse body as error: %v", tc.name, err) + t.Logf("[%s] received body: %v", tc.name, string(content)) + } else { + t.Logf("[%s] received errors: %#+v", tc.name, errs) + } + } + } + + stats := m.getStats() + for method, exp := range tc.expectedMethodInvocations { + invoked := stats[method] + if invoked != exp { + t.Errorf("[%s] unexpected number of infocations of method %q: %v != %v", tc.name, method, invoked, exp) + } + } + for method, invoked := range stats { + if _, ok := tc.expectedMethodInvocations[method]; !ok { + t.Errorf("[%s] unexpected method %q invoked %d times", tc.name, method, invoked) + } + } + } +} + +type testBlobDescriptorManager struct { + mu sync.Mutex + stats map[string]int + unsetRepository bool +} + +func (m *testBlobDescriptorManager) clearStats() { + m.mu.Lock() + defer m.mu.Unlock() + + for k := range m.stats { + delete(m.stats, k) + } +} + +func (m *testBlobDescriptorManager) methodInvoked(methodName string) int { + m.mu.Lock() + defer m.mu.Unlock() + + newCount := m.stats[methodName] + 1 + m.stats[methodName] = newCount + + return newCount +} + +// unsetRepository returns true if the testBlobDescriptorService should unset repository from context before +// passing down the call +func (m *testBlobDescriptorManager) getUnsetRepository() bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.unsetRepository +} + +// changeUnsetRepository allows to configure whether the testBlobDescriptorService should unset repository +// from context before passing down the call +func (m *testBlobDescriptorManager) changeUnsetRepository(unset bool) { + m.mu.Lock() + defer m.mu.Unlock() + + m.unsetRepository = unset +} + +func (m *testBlobDescriptorManager) getStats() map[string]int { + m.mu.Lock() + defer m.mu.Unlock() + + stats := make(map[string]int) + for k, v := range m.stats { + stats[k] = v + } + return stats +} + +// fakeBlobDescriptorService installs a fake blob descriptor on top of blobDescriptorService that collects +// stats of method invocations. unsetRepository commands the controller to remove repository object from +// context passed down to blobDescriptorService if true. +func fakeBlobDescriptorService(t *testing.T) *testBlobDescriptorManager { + m := &testBlobDescriptorManager{ + stats: make(map[string]int), + } + middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&testBlobDescriptorServiceFactory{t: t, m: m})) + return m +} + +type testBlobDescriptorServiceFactory struct { + t *testing.T + m *testBlobDescriptorManager +} + +func (bf *testBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService { + if _, ok := svc.(*blobDescriptorService); !ok { + svc = (&blobDescriptorServiceFactory{}).BlobAccessController(svc) + } + return &testBlobDescriptorService{BlobDescriptorService: svc, t: bf.t, m: bf.m} +} + +type testBlobDescriptorService struct { + distribution.BlobDescriptorService + t *testing.T + m *testBlobDescriptorManager +} + +func (bs *testBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + bs.m.methodInvoked("Stat") + if bs.m.getUnsetRepository() { + bs.t.Logf("unsetting repository from the context") + ctx = WithRepository(ctx, nil) + } + + return bs.BlobDescriptorService.Stat(ctx, dgst) +} +func (bs *testBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error { + bs.m.methodInvoked("Clear") + if bs.m.getUnsetRepository() { + bs.t.Logf("unsetting repository from the context") + ctx = WithRepository(ctx, nil) + } + return bs.BlobDescriptorService.Clear(ctx, dgst) +} + +// uploadTestBlob generates a random tar file and uploads it to the given repository. +func uploadTestBlob(t *testing.T, serverURL *url.URL, repoName string) distribution.Descriptor { + rs, ds, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("unexpected error generating test layer file: %v", err) + } + dgst := digest.Digest(ds) + + ctx := context.Background() + ref, err := reference.ParseNamed(repoName) + if err != nil { + t.Fatal(err) + } + t.Logf("server url: %s", serverURL.String()) + repo, err := client.NewRepository(ctx, ref, serverURL.String(), nil) + if err != nil { + t.Fatalf("failed to get repository %q: %v", repoName, err) + } + blobs := repo.Blobs(ctx) + wr, err := blobs.Create(ctx) + if err != nil { + t.Fatal(err) + } + n, err := io.Copy(wr, rs) + if err != nil { + t.Fatalf("unexpected error copying to upload: %v", err) + } + desc, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}) + if err != nil { + t.Fatal(err) + } + + t.Logf("uploaded generated layer of size %d with digest %q\n", n, dgst.String()) + + return desc +} + +const fakeAuthorizerName = "fake" + +// installFakeAccessController installs an authorizer that allows access anywhere to anybody. +func installFakeAccessController(t *testing.T) { + registryauth.Register(fakeAuthorizerName, registryauth.InitFunc( + func(options map[string]interface{}) (registryauth.AccessController, error) { + return &fakeAccessController{t: t}, nil + })) +} + +type fakeAccessController struct { + t *testing.T +} + +var _ registryauth.AccessController = &fakeAccessController{} + +func (f *fakeAccessController) Authorized(ctx context.Context, access ...registryauth.Access) (context.Context, error) { + for _, access := range access { + f.t.Logf("fake authorizer: authorizing access to %s:%s:%s", access.Resource.Type, access.Resource.Name, access.Action) + } + + ctx = WithAuthPerformed(ctx) + return ctx, nil +} + +func makeFakeRegistryClient(client osclient.Interface, kClient kclient.Interface) RegistryClient { + return &fakeRegistryClient{ + client: client, + kClient: kClient, + } +} + +type fakeRegistryClient struct { + client osclient.Interface + kClient kclient.Interface +} + +func (f *fakeRegistryClient) Clients() (osclient.Interface, kclient.Interface, error) { + return f.client, f.kClient, nil +} +func (f *fakeRegistryClient) SafeClientConfig() restclient.Config { + return (®istryClient{}).SafeClientConfig() +} diff --git a/pkg/dockerregistry/server/repositorymiddleware_test.go b/pkg/dockerregistry/server/repositorymiddleware_test.go new file mode 100644 index 000000000000..88b093883a86 --- /dev/null +++ b/pkg/dockerregistry/server/repositorymiddleware_test.go @@ -0,0 +1,851 @@ +package server + +import ( + "encoding/json" + "fmt" + "io" + "reflect" + "strings" + "testing" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/manifest/schema1" + // "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" + + kapi "k8s.io/kubernetes/pkg/api" + kerrors "k8s.io/kubernetes/pkg/api/errors" + ktestclient "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/diff" + + "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/client/testclient" + imagetest "github.com/openshift/origin/pkg/image/admission/testutil" + imageapi "github.com/openshift/origin/pkg/image/api" +) + +const ( + // testImageLayerCount says how many layers to generate per image + testImageLayerCount = 2 + testBlobRepositoryCacheTTL = time.Millisecond * 500 +) + +func TestRepositoryBlobStat(t *testing.T) { + quotaEnforcing = "aEnforcingConfig{} + + ctx := context.Background() + // this driver holds all the testing blobs in memory during the whole test run + driver := inmemory.New() + // generate two images and store their blobs in the driver + testImages, err := populateTestStorage(t, driver, true, 1, map[string]int{"nm/is:latest": 1, "nm/repo:missing-layer-links": 1}, nil) + if err != nil { + t.Fatal(err) + } + // generate an image and store its blobs in the driver; the resulting image will lack managed by openshift + // annotation + testImages, err = populateTestStorage(t, driver, false, 1, map[string]int{"nm/unmanaged:missing-layer-links": 1}, testImages) + if err != nil { + t.Fatal(err) + } + + // remove layer repository links from two of the above images; keep the uploaded blobs in the global + // blostore though + for _, name := range []string{"nm/repo:missing-layer-links", "nm/unmanaged:missing-layer-links"} { + repoName := strings.Split(name, ":")[0] + for _, layer := range testImages[name][0].DockerImageLayers { + dgst := digest.Digest(layer.Name) + alg, hex := dgst.Algorithm(), dgst.Hex() + err := driver.Delete(ctx, fmt.Sprintf("/docker/registry/v2/repositories/%s/_layers/%s/%s", repoName, alg, hex)) + if err != nil { + t.Fatalf("failed to delete layer link %q from repository %q: %v", layer.Name, repoName, err) + } + } + } + + // generate random images without storing its blobs in the driver + etcdOnlyImages := map[string]*imageapi.Image{} + for _, d := range []struct { + name string + managed bool + }{{"nm/is", true}, {"registry.org:5000/user/app", false}} { + etcdOnlyImages[d.name] = newImageForManifest(t, d.name, sampleImageManifestSchema1, d.managed) + } + + for _, tc := range []struct { + name string + stat string + images []imageapi.Image + imageStreams []imageapi.ImageStream + pullthrough bool + skipAuth bool + deferredErrors deferredErrors + expectedDescriptor distribution.Descriptor + expectedError error + expectedActions []clientAction + }{ + { + name: "local stat", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + imageStreams: []imageapi.ImageStream{{ObjectMeta: kapi.ObjectMeta{Namespace: "nm", Name: "is"}}}, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/is:latest"][0].DockerImageLayers[0]), + }, + + { + name: "blob only tagged in image stream", + stat: "nm/repo@" + testImages["nm/repo:missing-layer-links"][0].DockerImageLayers[1].Name, + images: []imageapi.Image{*testImages["nm/repo:missing-layer-links"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "repo", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/repo:missing-layer-links"][0].Name, + }, + }, + }, + }, + }, + }, + }, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/repo:missing-layer-links"][0].DockerImageLayers[1]), + expectedActions: []clientAction{{"get", "imagestreams"}, {"get", "images"}}, + }, + + { + name: "blob referenced only by not managed image with pullthrough on", + stat: "nm/unmanaged@" + testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1].Name, + images: []imageapi.Image{*testImages["nm/unmanaged:missing-layer-links"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "unmanaged", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/unmanaged:missing-layer-links"][0].Name, + }, + }, + }, + }, + }, + }, + }, + pullthrough: true, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1]), + expectedActions: []clientAction{{"get", "imagestreams"}, {"get", "images"}}, + }, + + { + // TODO: this should err out because of missing image stream. + // Unfortunately, it's not the case. Until we start storing layer links in etcd, we depend on + // local layer links. + name: "layer link present while image stream not found", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + images: []imageapi.Image{*testImages["nm/is:latest"][0]}, + expectedDescriptor: testNewDescriptorForLayer(testImages["nm/is:latest"][0].DockerImageLayers[0]), + }, + + { + name: "blob only tagged by not managed image with pullthrough off", + stat: "nm/repo@" + testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1].Name, + images: []imageapi.Image{*testImages["nm/unmanaged:missing-layer-links"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "repo", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/unmanaged:missing-layer-links"][0].DockerImageLayers[1].Name, + }, + }, + }, + }, + }, + }, + }, + expectedError: distribution.ErrBlobUnknown, + expectedActions: []clientAction{{"get", "imagestreams"}, {"get", "images"}}, + }, + + { + name: "blob not stored locally but referred in image stream", + stat: "nm/is@" + etcdOnlyImages["nm/is"].DockerImageLayers[1].Name, + images: []imageapi.Image{*etcdOnlyImages["nm/is"]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "is", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: etcdOnlyImages["nm/is"].Name, + }, + }, + }, + }, + }, + }, + }, + expectedError: distribution.ErrBlobUnknown, + }, + + { + name: "blob does not exist", + stat: "nm/repo@" + etcdOnlyImages["nm/is"].DockerImageLayers[0].Name, + images: []imageapi.Image{*testImages["nm/is:latest"][0]}, + imageStreams: []imageapi.ImageStream{ + { + ObjectMeta: kapi.ObjectMeta{ + Namespace: "nm", + Name: "repo", + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "latest": { + Items: []imageapi.TagEvent{ + { + Image: testImages["nm/is:latest"][0].Name, + }, + }, + }, + }, + }, + }, + }, + expectedError: distribution.ErrBlobUnknown, + }, + + { + name: "auth not performed", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + imageStreams: []imageapi.ImageStream{{ObjectMeta: kapi.ObjectMeta{Namespace: "nm", Name: "is"}}}, + skipAuth: true, + expectedError: fmt.Errorf("openshift.auth.completed missing from context"), + }, + + { + name: "deferred error", + stat: "nm/is@" + testImages["nm/is:latest"][0].DockerImageLayers[0].Name, + imageStreams: []imageapi.ImageStream{{ObjectMeta: kapi.ObjectMeta{Namespace: "nm", Name: "is"}}}, + deferredErrors: deferredErrors{"nm/is": ErrOpenShiftAccessDenied}, + expectedError: ErrOpenShiftAccessDenied, + }, + } { + ref, err := reference.Parse(tc.stat) + if err != nil { + t.Errorf("[%s] failed to parse blob reference %q: %v", tc.name, tc.stat, err) + continue + } + canonical, ok := ref.(reference.Canonical) + if !ok { + t.Errorf("[%s] not a canonical reference %q", tc.name, ref.String()) + continue + } + + cachedLayers, err = newDigestToRepositoryCache(defaultDigestToRepositoryCacheSize) + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + if !tc.skipAuth { + ctx = WithAuthPerformed(ctx) + } + if tc.deferredErrors != nil { + ctx = WithDeferredErrors(ctx, tc.deferredErrors) + } + + client := &testclient.Fake{} + client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, tc.imageStreams...)) + client.AddReactor("get", "images", getFakeImageGetHandler(t, tc.images...)) + + reg, err := newTestRegistry(ctx, client, driver, defaultBlobRepositoryCacheTTL, tc.pullthrough, true) + if err != nil { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + continue + } + + repo, err := reg.Repository(ctx, canonical) + if err != nil { + t.Errorf("[%s] unexpected error: %v", tc.name, err) + continue + } + + desc, err := repo.Blobs(ctx).Stat(ctx, canonical.Digest()) + if err != nil && tc.expectedError == nil { + t.Errorf("[%s] got unexpected stat error: %v", tc.name, err) + continue + } + if err == nil && tc.expectedError != nil { + t.Errorf("[%s] got unexpected non-error", tc.name) + continue + } + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("[%s] got unexpected error: %s", tc.name, diff.ObjectGoPrintDiff(err, tc.expectedError)) + continue + } + if tc.expectedError == nil && !reflect.DeepEqual(desc, tc.expectedDescriptor) { + t.Errorf("[%s] got unexpected descriptor: %s", tc.name, diff.ObjectGoPrintDiff(desc, tc.expectedDescriptor)) + } + + compareActions(t, tc.name, client.Actions(), tc.expectedActions) + } +} + +func TestRepositoryBlobStatCacheEviction(t *testing.T) { + const blobRepoCacheTTL = time.Millisecond * 500 + + quotaEnforcing = "aEnforcingConfig{} + ctx := WithAuthPerformed(context.Background()) + + // this driver holds all the testing blobs in memory during the whole test run + driver := inmemory.New() + // generate two images and store their blobs in the driver + testImages, err := populateTestStorage(t, driver, true, 1, map[string]int{"nm/is:latest": 1}, nil) + if err != nil { + t.Fatal(err) + } + testImage := testImages["nm/is:latest"][0] + testImageStream := testNewImageStreamObject("nm", "is", "latest", testImage.Name) + + blob1Desc := testNewDescriptorForLayer(testImage.DockerImageLayers[0]) + blob1Dgst := blob1Desc.Digest + blob2Desc := testNewDescriptorForLayer(testImage.DockerImageLayers[1]) + blob2Dgst := blob2Desc.Digest + + // remove repo layer repo link of the image's second blob + alg, hex := blob2Dgst.Algorithm(), blob2Dgst.Hex() + err = driver.Delete(ctx, fmt.Sprintf("/docker/registry/v2/repositories/%s/_layers/%s/%s", "nm/is", alg, hex)) + + cachedLayers, err = newDigestToRepositoryCache(defaultDigestToRepositoryCacheSize) + if err != nil { + t.Fatal(err) + } + + client := &testclient.Fake{} + client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, *testImageStream)) + client.AddReactor("get", "images", getFakeImageGetHandler(t, *testImage)) + + reg, err := newTestRegistry(ctx, client, driver, blobRepoCacheTTL, false, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + ref, err := reference.ParseNamed("nm/is") + if err != nil { + t.Errorf("failed to parse blob reference %q: %v", "nm/is", err) + } + + repo, err := reg.Repository(ctx, ref) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // hit the layer repo link - cache the result + desc, err := repo.Blobs(ctx).Stat(ctx, blob1Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob1Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob1Desc) + } + + compareActions(t, "no actions expected", client.Actions(), []clientAction{}) + + // remove layer repo link, delete the association from cache as well + err = repo.Blobs(ctx).Delete(ctx, blob1Dgst) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + // query etcd + desc, err = repo.Blobs(ctx).Stat(ctx, blob1Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob1Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob1Desc) + } + + expectedActions := []clientAction{{"get", "imagestreams"}, {"get", "images"}} + compareActions(t, "1st roundtrip to etcd", client.Actions(), expectedActions) + + // remove the underlying blob + vacuum := storage.NewVacuum(ctx, driver) + err = vacuum.RemoveBlob(blob1Dgst.String()) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + // fail because the blob isn't stored locally + desc, err = repo.Blobs(ctx).Stat(ctx, blob1Dgst) + if err == nil { + t.Fatalf("got unexpected non error: %v", err) + } + if err != distribution.ErrBlobUnknown { + t.Fatalf("got unexpected error: %#+v", err) + } + + // cache hit - don't query etcd + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob2Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob2Desc) + } + + compareActions(t, "no etcd query", client.Actions(), expectedActions) + + lastStatTimestamp := time.Now() + + // hit the cache + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob2Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob2Desc) + } + + // cache hit - no additional etcd query + compareActions(t, "no roundrip to etcd", client.Actions(), expectedActions) + + t.Logf("sleeping %s while waiting for eviction of blob %q from cache", blobRepoCacheTTL.String(), blob2Dgst.String()) + time.Sleep(blobRepoCacheTTL - (time.Now().Sub(lastStatTimestamp))) + + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err != nil { + t.Fatalf("got unexpected stat error: %v", err) + } + if !reflect.DeepEqual(desc, blob2Desc) { + t.Fatalf("got unexpected descriptor: %#+v != %#+v", desc, blob2Desc) + } + + expectedActions = append(expectedActions, []clientAction{{"get", "imagestreams"}, {"get", "images"}}...) + compareActions(t, "2nd roundtrip to etcd", client.Actions(), expectedActions) + + err = vacuum.RemoveBlob(blob2Dgst.String()) + if err != nil { + t.Fatalf("got unexpected error: %v", err) + } + + // fail because the blob isn't stored locally + desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) + if err == nil { + t.Fatalf("got unexpected non error: %v", err) + } + if err != distribution.ErrBlobUnknown { + t.Fatalf("got unexpected error: %#+v", err) + } +} + +type clientAction struct { + verb string + resource string +} + +func getFakeImageGetHandler(t *testing.T, iss ...imageapi.Image) ktestclient.ReactionFunc { + return func(action ktestclient.Action) (handled bool, ret runtime.Object, err error) { + switch a := action.(type) { + case ktestclient.GetAction: + for _, is := range iss { + if a.GetName() == is.Name { + t.Logf("images get handler: returning image %s", is.Name) + return true, &is, nil + } + } + + err := kerrors.NewNotFound(kapi.Resource("images"), a.GetName()) + t.Logf("image get handler: %v", err) + return true, nil, err + } + return false, nil, nil + } +} + +const sampleImageManifestSchema1 = `{ + "schemaVersion": 1, + "name": "nm/is", + "tag": "latest", + "architecture": "", + "fsLayers": [ + { + "blobSum": "sha256:b2c5513bd934a7efb412c0dd965600b8cb00575b585eaff1cb980b69037fe6cd" + }, + { + "blobSum": "sha256:2dde6f11a89463bf20dba3b47d8b3b6de7cdcc19e50634e95a18dd95c278768d" + } + ], + "history": [ + { + "v1Compatibility": "{\"size\":18407936}" + }, + { + "v1Compatibility": "{\"size\":19387392}" + } + ], + "signatures": [ + { + "header": { + "jwk": { + "crv": "P-256", + "kid": "5HTY:A24B:L6PG:TQ3G:GMAK:QGKZ:ICD4:S7ZJ:P5JX:UTMP:XZLK:ZXVH", + "kty": "EC", + "x": "j5YnDSyrVIt3NquUKvcZIpbfeD8HLZ7BVBFL4WutRBM", + "y": "PBgFAZ3nNakYN3H9enhrdUrQ_HPYzb8oX5rtJxJo1Y8" + }, + "alg": "ES256" + }, + "signature": "1rXiEmWnf9eL7m7Wy3K4l25-Zv2XXl5GgqhM_yjT0ujPmTn0uwfHcCWlweHa9gput3sECj507eQyGpBOF5rD6Q", + "protected": "eyJmb3JtYXRMZW5ndGgiOjQ4NSwiZm9ybWF0VGFpbCI6IkNuMCIsInRpbWUiOiIyMDE2LTA3LTI2VDExOjQ2OjQ2WiJ9" + } + ] +}` + +func newImageForManifest(t *testing.T, repoName string, rawManifest string, managedByOpenShift bool) *imageapi.Image { + var versioned manifest.Versioned + if err := json.Unmarshal([]byte(rawManifest), &versioned); err != nil { + t.Fatal(err) + } + + _, desc, err := distribution.UnmarshalManifest(versioned.MediaType, []byte(rawManifest)) + if err != nil { + t.Fatal(err) + } + + annotations := make(map[string]string) + if managedByOpenShift { + annotations[imageapi.ManagedByOpenShiftAnnotation] = "true" + } + + img := &imageapi.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: desc.Digest.String(), + Annotations: annotations, + }, + DockerImageReference: fmt.Sprintf("localhost:5000/%s@%s", repoName, desc.Digest.String()), + DockerImageManifest: string(rawManifest), + } + + if err := imageapi.ImageWithMetadata(img); err != nil { + t.Fatal(err) + } + + return img +} + +func storeTestImage( + ctx context.Context, + reg distribution.Namespace, + imageReference reference.NamedTagged, + schemaVersion int, + managedByOpenShift bool, +) (*imageapi.Image, error) { + repo, err := reg.Repository(ctx, imageReference) + if err != nil { + return nil, fmt.Errorf("unexpected error getting repo %q: %v", imageReference.Name(), err) + } + + var ( + m distribution.Manifest + m1 schema1.Manifest + ) + switch schemaVersion { + case 1: + m1 = schema1.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: imageReference.Name(), + Tag: imageReference.Tag(), + } + case 2: + // TODO + fallthrough + default: + return nil, fmt.Errorf("unsupported manifest version %d", schemaVersion) + } + + for i := 0; i < testImageLayerCount; i++ { + rs, ds, err := testutil.CreateRandomTarFile() + if err != nil { + return nil, fmt.Errorf("unexpected error generating test layer file: %v", err) + } + dgst := digest.Digest(ds) + + wr, err := repo.Blobs(ctx).Create(ctx) + if err != nil { + return nil, fmt.Errorf("unexpected error creating test upload: %v", err) + } + defer wr.Close() + + n, err := io.Copy(wr, rs) + if err != nil { + return nil, fmt.Errorf("unexpected error copying to upload: %v", err) + } + + if schemaVersion == 1 { + m1.FSLayers = append(m1.FSLayers, schema1.FSLayer{BlobSum: dgst}) + m1.History = append(m1.History, schema1.History{V1Compatibility: fmt.Sprintf(`{"size":%d}`, n)}) + } // TODO v2 + + if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst, MediaType: schema1.MediaTypeManifestLayer}); err != nil { + return nil, fmt.Errorf("unexpected error finishing upload: %v", err) + } + } + + var dgst digest.Digest + var payload []byte + + if schemaVersion == 1 { + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + return nil, fmt.Errorf("unexpected error generating private key: %v", err) + } + + m, err = schema1.Sign(&m1, pk) + if err != nil { + return nil, fmt.Errorf("error signing manifest: %v", err) + } + + _, payload, err = m.Payload() + if err != nil { + return nil, fmt.Errorf("error getting payload %#v", err) + } + + dgst = digest.FromBytes(payload) + } //TODO v2 + + image := &imageapi.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: dgst.String(), + }, + DockerImageManifest: string(payload), + DockerImageReference: imageReference.Name() + "@" + dgst.String(), + } + + if managedByOpenShift { + image.Annotations = map[string]string{imageapi.ManagedByOpenShiftAnnotation: "true"} + } + + if schemaVersion == 1 { + signedManifest := m.(*schema1.SignedManifest) + signatures, err := signedManifest.Signatures() + if err != nil { + return nil, err + } + + for _, signDigest := range signatures { + image.DockerImageSignatures = append(image.DockerImageSignatures, signDigest) + } + } + + err = imageapi.ImageWithMetadata(image) + if err != nil { + return nil, fmt.Errorf("failed to fill image with metadata: %v", err) + } + + return image, nil +} + +func populateTestStorage( + t *testing.T, + driver driver.StorageDriver, + setManagedByOpenShift bool, + schemaVersion int, + repoImages map[string]int, + testImages map[string][]*imageapi.Image, +) (map[string][]*imageapi.Image, error) { + ctx := context.Background() + reg, err := storage.NewRegistry(ctx, driver) + if err != nil { + t.Fatalf("error creating registry: %v", err) + } + + result := make(map[string][]*imageapi.Image) + for key, value := range testImages { + images := make([]*imageapi.Image, len(value)) + copy(images, value) + result[key] = images + } + + for imageReference := range repoImages { + parsed, err := reference.Parse(imageReference) + if err != nil { + t.Fatalf("failed to parse reference %q: %v", imageReference, err) + } + namedTagged, ok := parsed.(reference.NamedTagged) + if !ok { + t.Fatalf("expected NamedTagged reference, not %T", parsed) + } + + imageCount := repoImages[imageReference] + + for i := 0; i < imageCount; i++ { + img, err := storeTestImage(ctx, reg, namedTagged, schemaVersion, setManagedByOpenShift) + if err != nil { + t.Fatal(err) + } + arr := result[imageReference] + t.Logf("created image %s@%s image with layers:", namedTagged.Name(), img.Name) + for _, l := range img.DockerImageLayers { + t.Logf(" %s of size %d", l.Name, l.LayerSize) + } + result[imageReference] = append(arr, img) + } + } + + return result, nil +} + +func newTestRegistry( + ctx context.Context, + osClient client.Interface, + storageDriver driver.StorageDriver, + blobrepositorycachettl time.Duration, + pullthrough bool, + useBlobDescriptorCacheProvider bool, +) (*testRegistry, error) { + if storageDriver == nil { + storageDriver = inmemory.New() + } + dockerStorageDriver = storageDriver + + opts := []storage.RegistryOption{ + storage.BlobDescriptorServiceFactory(&blobDescriptorServiceFactory{}), + storage.EnableDelete, + storage.EnableRedirect, + } + if useBlobDescriptorCacheProvider { + cacheProvider := cache.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()) + opts = append(opts, storage.BlobDescriptorCacheProvider(cacheProvider)) + } + + reg, err := storage.NewRegistry(ctx, dockerStorageDriver, opts...) + if err != nil { + return nil, err + } + dockerRegistry = reg + + return &testRegistry{ + Namespace: dockerRegistry, + osClient: osClient, + blobrepositorycachettl: blobrepositorycachettl, + pullthrough: pullthrough, + }, nil +} + +type testRegistry struct { + distribution.Namespace + osClient client.Interface + pullthrough bool + blobrepositorycachettl time.Duration +} + +var _ distribution.Namespace = &testRegistry{} + +func (r *testRegistry) Repository(ctx context.Context, ref reference.Named) (distribution.Repository, error) { + repo, err := r.Namespace.Repository(ctx, ref) + if err != nil { + return nil, err + } + + kFakeClient := ktestclient.NewSimpleFake() + + parts := strings.SplitN(ref.Name(), "/", 3) + if len(parts) != 2 { + return nil, fmt.Errorf("failed to parse repository name %q", ref.Name()) + } + + return &repository{ + Repository: repo, + + ctx: ctx, + quotaClient: kFakeClient, + limitClient: kFakeClient, + registryOSClient: r.osClient, + registryAddr: "localhost:5000", + namespace: parts[0], + name: parts[1], + blobrepositorycachettl: r.blobrepositorycachettl, + cachedLayers: cachedLayers, + pullthrough: r.pullthrough, + }, nil +} + +func testNewImageStreamObject(namespace, name, tag, imageName string) *imageapi.ImageStream { + return &imageapi.ImageStream{ + ObjectMeta: kapi.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + tag: { + Items: []imageapi.TagEvent{ + { + Image: imageName, + }, + }, + }, + }, + }, + } +} + +func testNewDescriptorForLayer(layer imageapi.ImageLayer) distribution.Descriptor { + return distribution.Descriptor{ + Digest: digest.Digest(layer.Name), + MediaType: "application/octet-stream", + Size: layer.LayerSize, + } +} + +func compareActions(t *testing.T, testCaseName string, actions []ktestclient.Action, expectedActions []clientAction) { + for i, action := range actions { + if i >= len(expectedActions) { + t.Errorf("[%s] got unexpected client action: %#+v", testCaseName, action) + continue + } + expected := expectedActions[i] + if !action.Matches(expected.verb, expected.resource) { + t.Errorf("[%s] expected client action %s[%s], got instead: %#+v", testCaseName, expected.verb, expected.resource, action) + } + } + for i := len(actions); i < len(expectedActions); i++ { + expected := expectedActions[i] + t.Errorf("[%s] expected action %s[%s] did not happen", testCaseName, expected.verb, expected.resource) + } +} From 1670b8c751ea4e7c161d093a172172944eb34de7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Fri, 29 Jul 2016 22:44:46 +0200 Subject: [PATCH 12/12] Enable remote layer federation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During a cross-repo mount push where the mounted blob belongs to remote image, tag the image into target image stream. This will utilize pullthrough to federate image's blobs without storing them in the registry. Image is tagged under `_pullthrough_dep_${blobdgst}` tag. Signed-off-by: Michal Minář --- pkg/dockerregistry/server/errorblobstore.go | 92 ++++++++++++++++++- .../server/pullthroughblobstore.go | 9 ++ pkg/dockerregistry/server/util.go | 9 ++ test/end-to-end/core.sh | 2 + 4 files changed, 107 insertions(+), 5 deletions(-) diff --git a/pkg/dockerregistry/server/errorblobstore.go b/pkg/dockerregistry/server/errorblobstore.go index af8dc3f5e822..57fe5f37b9ab 100644 --- a/pkg/dockerregistry/server/errorblobstore.go +++ b/pkg/dockerregistry/server/errorblobstore.go @@ -8,6 +8,11 @@ import ( "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/reference" + + kapi "k8s.io/kubernetes/pkg/api" + kerrors "k8s.io/kubernetes/pkg/api/errors" + + imageapi "github.com/openshift/origin/pkg/image/api" ) // errorBlobStore wraps a distribution.BlobStore for a particular repo. @@ -55,6 +60,7 @@ func (r *errorBlobStore) Create(ctx context.Context, options ...distribution.Blo } ctx = WithRepository(ctx, r.repo) + var pullthroughSourceImageReference *imageapi.DockerImageReference opts, err := effectiveCreateOptions(options) if err != nil { @@ -62,7 +68,13 @@ func (r *errorBlobStore) Create(ctx context.Context, options ...distribution.Blo } err = checkPendingCrossMountErrors(ctx, opts) if err == nil && opts.Mount.ShouldMount { - desc, err = statSourceRepository(ctx, opts.Mount.From, opts.Mount.From.Digest()) + context.GetLogger(ctx).Debugf("checking for presence of blob %s in a source repository %s", opts.Mount.From.Digest().String(), opts.Mount.From.Name()) + desc, pullthroughSourceImageReference, err = statSourceRepository(ctx, r.repo, opts.Mount.From, opts.Mount.From.Digest()) + } + if err == nil && pullthroughSourceImageReference != nil { + ref := pullthroughSourceImageReference.MostSpecific() + context.GetLogger(ctx).Debugf("trying to tag source image %s into image stream %s", ref.Exact(), r.repo.Named().Name()) + err = tagPullthroughSourceImageInTargetRepository(ctx, &ref, r.repo, opts.Mount.From.Digest()) } if err != nil { @@ -154,10 +166,80 @@ func (f statCrossMountCreateOptions) Apply(v interface{}) error { return nil } -func statSourceRepository(ctx context.Context, sourceRepoName reference.Named, dgst digest.Digest) (distribution.Descriptor, error) { - repo, err := dockerRegistry.Repository(ctx, sourceRepoName) +// statSourceRepository founds a blob in the source repository of cross-repo mount and returns its descriptor +// if found. If the blob is not stored locally but it's available in remote repository, the +// pullthroughSourceImageReference output parameter will be set to contain a reference of an image containing +// it. +func statSourceRepository( + ctx context.Context, + destRepo *repository, + sourceRepoName reference.Named, + dgst digest.Digest, +) (desc distribution.Descriptor, pullthroughSourceImageReference *imageapi.DockerImageReference, err error) { + upstreamRepo, err := dockerRegistry.Repository(ctx, sourceRepoName) if err != nil { - return distribution.Descriptor{}, err + return distribution.Descriptor{}, nil, err + } + namespace, name, err := getNamespaceName(sourceRepoName.Name()) + if err != nil { + return distribution.Descriptor{}, nil, err + } + + repo := *destRepo + repo.namespace = namespace + repo.name = name + repo.Repository = upstreamRepo + + // ask pullthrough blob store to set source image reference if the blob is found in remote repository + var ref imageapi.DockerImageReference + ctx = WithPullthroughSourceImageReference(ctx, &ref) + + desc, err = repo.Blobs(ctx).Stat(ctx, dgst) + if err == nil && len(ref.Registry) != 0 { + pullthroughSourceImageReference = &ref } - return repo.Blobs(ctx).Stat(ctx, dgst) + return +} + +// tagPullthroughSourceImageInTargetRepository creates a tag in a destination image stream of cross-repo mount +// referencing a remote image that contains the blob. With the reference present in the target stream, the +// pullthrough will allow to serve the blob from the image stream without storing it locally. +func tagPullthroughSourceImageInTargetRepository(ctx context.Context, ref *imageapi.DockerImageReference, destRepo *repository, dgst digest.Digest) error { + if len(ref.ID) == 0 { + return fmt.Errorf("cannot tag image lacking ID as a pullthrough source (%s)", ref.Exact()) + } + + tag := fmt.Sprintf("_pullthrough_dep_%s", dgst.Hex()[0:6]) + + is, err := destRepo.getImageStream() + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + // create image stream + stream := imageapi.ImageStream{ + ObjectMeta: kapi.ObjectMeta{ + Name: destRepo.name, + }, + } + context.GetLogger(ctx).Infof("creating image stream to hold pullthrough source image %q for blob %q", ref.Exact(), dgst.String()) + is, err = destRepo.registryOSClient.ImageStreams(destRepo.namespace).Create(&stream) + if kerrors.IsAlreadyExists(err) { + is, err = destRepo.getImageStream() + if err != nil { + return err + } + } + } + + _, err = imageapi.ResolveImageID(is, ref.ID) + if err == nil { + context.GetLogger(ctx).Debugf("source image %s is already rererenced in image stream", ref.ID) + return err + } + + // TODO: there's a danger of creating several similar tags for different blobs during a single image push + context.GetLogger(ctx).Infof("creating istag %s:%s referencing image %q", destRepo.Named().Name(), tag, ref.ID) + return destRepo.Tags(ctx).Tag(ctx, tag, distribution.Descriptor{Digest: digest.Digest(ref.ID)}) } diff --git a/pkg/dockerregistry/server/pullthroughblobstore.go b/pkg/dockerregistry/server/pullthroughblobstore.go index c54d4b072d54..482524074950 100644 --- a/pkg/dockerregistry/server/pullthroughblobstore.go +++ b/pkg/dockerregistry/server/pullthroughblobstore.go @@ -138,6 +138,9 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear return distribution.Descriptor{}, distribution.ErrBlobUnknown } + // if not nil, higher level asks us to set image reference if found + pullthroughSourceImage, _ := PullthroughSourceImageReferenceFrom(ctx) + // see if any of the previously located repositories containing this digest are in this // image stream for _, repo := range cachedLayers { @@ -151,6 +154,9 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear continue } context.GetLogger(r.repo.ctx).Infof("Found digest location from cache %q in %q: %v", dgst, repo, err) + if pullthroughSourceImage != nil { + *pullthroughSourceImage = *ref + } return desc, nil } @@ -162,6 +168,9 @@ func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, sear } r.repo.cachedLayers.RememberDigest(dgst, r.repo.blobrepositorycachettl, repo) context.GetLogger(r.repo.ctx).Infof("Found digest location by search %q in %q: %v", dgst, repo, err) + if pullthroughSourceImage != nil { + *pullthroughSourceImage = *ref + } return desc, nil } diff --git a/pkg/dockerregistry/server/util.go b/pkg/dockerregistry/server/util.go index 9f5fdfd3e585..be65ea0fba9d 100644 --- a/pkg/dockerregistry/server/util.go +++ b/pkg/dockerregistry/server/util.go @@ -18,6 +18,8 @@ import ( const ( // repositoryKey serves to store/retrieve repository object to/from context. repositoryKey = "openshift.repository" + + pullthroughSourceImageReferenceKey = "openshift.pullthrough.sourceImage" ) func WithRepository(parent context.Context, repo *repository) context.Context { @@ -27,6 +29,13 @@ func RepositoryFrom(ctx context.Context) (repo *repository, found bool) { repo, found = ctx.Value(repositoryKey).(*repository) return } +func WithPullthroughSourceImageReference(parent context.Context, ref *imageapi.DockerImageReference) context.Context { + return context.WithValue(parent, pullthroughSourceImageReferenceKey, ref) +} +func PullthroughSourceImageReferenceFrom(ctx context.Context) (ref *imageapi.DockerImageReference, found bool) { + ref, found = ctx.Value(pullthroughSourceImageReferenceKey).(*imageapi.DockerImageReference) + return +} func getOptionValue( envVar string, diff --git a/test/end-to-end/core.sh b/test/end-to-end/core.sh index 7fc13efa4224..a9f2fa917925 100755 --- a/test/end-to-end/core.sh +++ b/test/end-to-end/core.sh @@ -256,6 +256,8 @@ os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '$ os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/hello-world'" "202 Accepted" # 201 means that blob has been cross mounted from given repository os::cmd::expect_success_and_text "curl -I -X POST -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/uploads/?mount=$rubyimageblob&from=cache/ruby-22-centos7'" "201 Created" +os::cmd::expect_success_and_text "oc get -n crossmount istag repo:_pullthrough_dep_${rubyimageblob:7:6}" "$rubyimagedigest" +os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "200 OK" # check that the blob is linked now os::cmd::expect_success_and_text "curl -I -X HEAD -u 'pusher:${pusher_token}' '${DOCKER_REGISTRY}/v2/crossmount/repo/blobs/$rubyimageblob'" "200 OK" # remove pusher's permissions to read from the source repository