diff --git a/pkg/dockerregistry/server/blobdescriptorservice.go b/pkg/dockerregistry/server/blobdescriptorservice.go index aa1bc48433d8..995ab91f3dc1 100644 --- a/pkg/dockerregistry/server/blobdescriptorservice.go +++ b/pkg/dockerregistry/server/blobdescriptorservice.go @@ -53,6 +53,7 @@ type blobDescriptorService struct { // corresponding image stream. This method is invoked from inside of upstream's linkedBlobStore. It expects // a proper repository object to be set on given context by upper openshift middleware wrappers. func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + context.GetLogger(ctx).Debugf("(*blobDescriptorService).Stat: starting with digest=%s", dgst.String()) repo, found := RepositoryFrom(ctx) if !found || repo == nil { err := fmt.Errorf("failed to retrieve repository from context") @@ -71,28 +72,27 @@ func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) ( return desc, nil } - context.GetLogger(ctx).Debugf("could not stat layer link %q in repository %q: %v", dgst.String(), repo.Named().Name(), err) + context.GetLogger(ctx).Debugf("(*blobDescriptorService).Stat: could not stat layer link %s in repository %s: %v", dgst.String(), repo.Named().Name(), err) // First attempt: looking for the blob locally desc, err = dockerRegistry.BlobStatter().Stat(ctx, dgst) if err == nil { + context.GetLogger(ctx).Debugf("(*blobDescriptorService).Stat: blob %s exists in the global blob store", dgst.String()) // only non-empty layers is wise to check for existence in the image stream. // schema v2 has no empty layers. if !isEmptyDigest(dgst) { // ensure it's referenced inside of corresponding image stream if !imageStreamHasBlob(repo, dgst) { + context.GetLogger(ctx).Debugf("(*blobDescriptorService).Stat: blob %s is neither empty nor referenced in image stream %s", dgst.String(), repo.Named().Name()) return distribution.Descriptor{}, distribution.ErrBlobUnknown } } return desc, nil } - if err == distribution.ErrBlobUnknown { + if err == distribution.ErrBlobUnknown && RemoteBlobAccessCheckEnabledFrom(ctx) { // Second attempt: looking for the blob on a remote server - remoteGetter, found := RemoteBlobGetterFrom(ctx) - if found { - desc, err = remoteGetter.Stat(ctx, dgst) - } + desc, err = repo.remoteBlobGetter.Stat(ctx, dgst) } return desc, err @@ -136,7 +136,7 @@ func imageStreamHasBlob(r *repository, dgst digest.Digest) bool { } // verify directly with etcd - is, err := r.getImageStream() + is, err := r.imageStreamGetter.get() if err != nil { context.GetLogger(r.ctx).Errorf("failed to get image stream: %v", err) return logFound(false) diff --git a/pkg/dockerregistry/server/blobdescriptorservice_test.go b/pkg/dockerregistry/server/blobdescriptorservice_test.go index 6776cc761bca..80896e3f15b1 100644 --- a/pkg/dockerregistry/server/blobdescriptorservice_test.go +++ b/pkg/dockerregistry/server/blobdescriptorservice_test.go @@ -32,19 +32,28 @@ import ( imagetest "github.com/openshift/origin/pkg/image/admission/testutil" ) +const testPassthroughToUpstream = "openshift.test.passthrough-to-upstream" + +func WithTestPassthroughToUpstream(ctx context.Context, passthrough bool) context.Context { + return context.WithValue(ctx, testPassthroughToUpstream, passthrough) +} + +func GetTestPassThroughToUpstream(ctx context.Context) bool { + passthrough, found := ctx.Value(testPassthroughToUpstream).(bool) + return found && passthrough +} + // TestBlobDescriptorServiceIsApplied ensures that blobDescriptorService middleware gets applied. // It relies on the fact that blobDescriptorService requires higher levels to set repository object on given // context. If the object isn't given, its method will err out. func TestBlobDescriptorServiceIsApplied(t *testing.T) { - ctx := context.Background() - // don't do any authorization check installFakeAccessController(t) m := fakeBlobDescriptorService(t) // to make other unit tests working defer m.changeUnsetRepository(false) - testImage, err := registrytest.NewImageForManifest("user/app", registrytest.SampleImageManifestSchema1, true) + testImage, err := registrytest.NewImageForManifest("user/app", registrytest.SampleImageManifestSchema1, "", true) if err != nil { t.Fatal(err) } @@ -61,7 +70,7 @@ func TestBlobDescriptorServiceIsApplied(t *testing.T) { DefaultRegistryClient = backupRegistryClient }() - app := handlers.NewApp(ctx, &configuration.Configuration{ + app := handlers.NewApp(context.Background(), &configuration.Configuration{ Loglevel: "debug", Auth: map[string]configuration.Parameters{ fakeAuthorizerName: {"realm": fakeAuthorizerName}, @@ -74,6 +83,11 @@ func TestBlobDescriptorServiceIsApplied(t *testing.T) { "delete": configuration.Parameters{ "enabled": true, }, + "maintenance": configuration.Parameters{ + "uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }, + }, }, Middleware: map[string][]configuration.Middleware{ "registry": {{Name: "openshift"}}, @@ -90,12 +104,12 @@ func TestBlobDescriptorServiceIsApplied(t *testing.T) { } os.Setenv("DOCKER_REGISTRY_URL", serverURL.Host) - desc, _, err := registrytest.UploadTestBlob(serverURL, nil, "user/app") + desc, _, err := registrytest.UploadRandomTestBlob(serverURL, nil, "user/app") if err != nil { t.Fatal(err) } - for _, tc := range []struct { + type testCase struct { name string method string endpoint string @@ -103,7 +117,70 @@ func TestBlobDescriptorServiceIsApplied(t *testing.T) { unsetRepository bool expectedStatus int expectedMethodInvocations map[string]int - }{ + } + + doTest := func(tc testCase) { + m.clearStats() + m.changeUnsetRepository(tc.unsetRepository) + + route := router.GetRoute(tc.endpoint).Host(serverURL.Host) + u, err := route.URL(tc.vars...) + if err != nil { + t.Errorf("[%s] failed to build route: %v", tc.name, err) + return + } + + req, err := http.NewRequest(tc.method, u.String(), nil) + if err != nil { + t.Errorf("[%s] failed to make request: %v", tc.name, err) + } + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Errorf("[%s] failed to do the request: %v", tc.name, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != tc.expectedStatus { + t.Errorf("[%s] unexpected status code: %v != %v", tc.name, resp.StatusCode, tc.expectedStatus) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + content, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("[%s] failed to read body: %v", tc.name, err) + } else if len(content) > 0 { + errs := errcode.Errors{} + err := errs.UnmarshalJSON(content) + if err != nil { + t.Logf("[%s] failed to parse body as error: %v", tc.name, err) + t.Logf("[%s] received body: %v", tc.name, string(content)) + } else { + t.Logf("[%s] received errors: %#+v", tc.name, errs) + } + } + } + + stats, err := m.getStats(tc.expectedMethodInvocations, time.Second*5) + if err != nil { + t.Fatalf("[%s] failed to get stats: %v", tc.name, err) + } + for method, exp := range tc.expectedMethodInvocations { + invoked := stats[method] + if invoked != exp { + t.Errorf("[%s] unexpected number of invocations of method %q: %v != %v", tc.name, method, invoked, exp) + } + } + for method, invoked := range stats { + if _, ok := tc.expectedMethodInvocations[method]; !ok { + t.Errorf("[%s] unexpected method %q invoked %d times", tc.name, method, invoked) + } + } + } + + for _, tc := range []testCase{ { name: "get blob with repository unset", method: http.MethodGet, @@ -190,120 +267,61 @@ func TestBlobDescriptorServiceIsApplied(t *testing.T) { }, { - name: "get manifest with repository unset", - method: http.MethodGet, + name: "delete manifest with repository unset", + method: http.MethodDelete, endpoint: v2.RouteNameManifest, vars: []string{ "name", "user/app", - "reference", "latest", + "reference", testImage.Name, }, unsetRepository: true, - // failed because we trying to get manifest from storage driver first. - expectedStatus: http.StatusNotFound, - // manifest can't be retrieved from etcd + expectedStatus: http.StatusInternalServerError, + // we don't allow to delete manifests from etcd; in this case, we attempt to delete layer link expectedMethodInvocations: map[string]int{"Stat": 1}, }, { - name: "get manifest", - method: http.MethodGet, + name: "delete manifest", + method: http.MethodDelete, endpoint: v2.RouteNameManifest, vars: []string{ "name", "user/app", - "reference", "latest", + "reference", testImage.Name, }, - expectedStatus: http.StatusOK, - // manifest is retrieved from etcd - expectedMethodInvocations: map[string]int{"Stat": 3}, + expectedStatus: http.StatusNotFound, + // we don't allow to delete manifests from etcd; in this case, we attempt to delete layer link + expectedMethodInvocations: map[string]int{"Stat": 1}, }, { - name: "delete manifest with repository unset", - method: http.MethodDelete, + name: "get manifest with repository unset", + method: http.MethodGet, endpoint: v2.RouteNameManifest, vars: []string{ "name", "user/app", - "reference", testImage.Name, + "reference", "latest", }, unsetRepository: true, - expectedStatus: http.StatusInternalServerError, - // we don't allow to delete manifests from etcd; in this case, we attempt to delete layer link + // failed because we trying to get manifest from storage driver first. + expectedStatus: http.StatusNotFound, + // manifest can't be retrieved from etcd expectedMethodInvocations: map[string]int{"Stat": 1}, }, { - name: "delete manifest", - method: http.MethodDelete, + name: "get manifest", + method: http.MethodGet, endpoint: v2.RouteNameManifest, vars: []string{ "name", "user/app", - "reference", testImage.Name, + "reference", "latest", }, - expectedStatus: http.StatusNotFound, - // we don't allow to delete manifests from etcd; in this case, we attempt to delete layer link + expectedStatus: http.StatusOK, + // manifest is retrieved from etcd expectedMethodInvocations: map[string]int{"Stat": 1}, }, } { - m.clearStats() - m.changeUnsetRepository(tc.unsetRepository) - - route := router.GetRoute(tc.endpoint).Host(serverURL.Host) - u, err := route.URL(tc.vars...) - if err != nil { - t.Errorf("[%s] failed to build route: %v", tc.name, err) - continue - } - - req, err := http.NewRequest(tc.method, u.String(), nil) - if err != nil { - t.Errorf("[%s] failed to make request: %v", tc.name, err) - continue - } - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - t.Errorf("[%s] failed to do the request: %v", tc.name, err) - continue - } - defer resp.Body.Close() - - if resp.StatusCode != tc.expectedStatus { - t.Errorf("[%s] unexpected status code: %v != %v", tc.name, resp.StatusCode, tc.expectedStatus) - } - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { - content, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Errorf("[%s] failed to read body: %v", tc.name, err) - } else if len(content) > 0 { - errs := errcode.Errors{} - err := errs.UnmarshalJSON(content) - if err != nil { - t.Logf("[%s] failed to parse body as error: %v", tc.name, err) - t.Logf("[%s] received body: %v", tc.name, string(content)) - } else { - t.Logf("[%s] received errors: %#+v", tc.name, errs) - } - } - } - - stats, err := m.getStats(tc.expectedMethodInvocations, time.Second*5) - if err != nil { - t.Errorf("[%s] failed to get stats: %v", tc.name, err) - continue - } - for method, exp := range tc.expectedMethodInvocations { - invoked := stats[method] - if invoked != exp { - t.Errorf("[%s] unexpected number of invocations of method %q: %v != %v", tc.name, method, invoked, exp) - } - } - for method, invoked := range stats { - if _, ok := tc.expectedMethodInvocations[method]; !ok { - t.Errorf("[%s] unexpected method %q invoked %d times", tc.name, method, invoked) - } - } + doTest(tc) } } @@ -366,30 +384,38 @@ func (m *testBlobDescriptorManager) changeUnsetRepository(unset bool) { // collected numbers of invocations per each method watched. An error will be returned if a given timeout is // reached without satisfying minimum limit.s func (m *testBlobDescriptorManager) getStats(minimumLimits map[string]int, timeout time.Duration) (map[string]int, error) { - m.mu.Lock() - defer m.mu.Unlock() - - var err error end := time.Now().Add(timeout) + stats := make(map[string]int) + + if len(minimumLimits) == 0 { + m.mu.Lock() + for k, v := range m.stats { + stats[k] = v + } + m.mu.Unlock() + return stats, nil + } + + c := make(chan struct{}) + go func() { + m.mu.Lock() + defer m.mu.Unlock() - if len(minimumLimits) > 0 { - Loop: for !statsGreaterThanOrEqual(m.stats, minimumLimits) { - c := make(chan struct{}) - go func() { m.cond.Wait(); c <- struct{}{} }() - - now := time.Now() - select { - case <-time.After(end.Sub(now)): - err = fmt.Errorf("timeout while waiting on expected stats") - break Loop - case <-c: - continue Loop - } + m.cond.Wait() } + c <- struct{}{} + }() + + var err error + select { + case <-time.After(end.Sub(time.Now())): + err = fmt.Errorf("timeout while waiting on expected stats") + case <-c: } - stats := make(map[string]int) + m.mu.Lock() + defer m.mu.Unlock() for k, v := range m.stats { stats[k] = v } @@ -495,3 +521,42 @@ func (f *fakeRegistryClient) Clients() (osclient.Interface, kclientset.Interface func (f *fakeRegistryClient) SafeClientConfig() restclient.Config { return (®istryClient{}).SafeClientConfig() } + +// passthroughBlobDescriptorService passes all Stat and Clear requests to +// custom blobDescriptorService by default. If +// "openshift.test.passthrough-to-upstream" is set on context with value +// "true", all the requests will be passed straight to the upstream blob +// descriptor service. +type passthroughBlobDescriptorService struct { + distribution.BlobDescriptorService +} + +func (pbds *passthroughBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + passthrough := GetTestPassThroughToUpstream(ctx) + if passthrough { + context.GetLogger(ctx).Debugf("(*passthroughBlobDescriptorService).Stat: passing down to upstream blob descriptor service") + return pbds.BlobDescriptorService.Stat(ctx, dgst) + } + context.GetLogger(ctx).Debugf("(*passthroughBlobDescriptorService).Stat: passing to openshift wrapper") + return (&blobDescriptorServiceFactory{}).BlobAccessController(pbds.BlobDescriptorService).Stat(ctx, dgst) +} + +func (pbds *passthroughBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error { + passthrough := GetTestPassThroughToUpstream(ctx) + if passthrough { + context.GetLogger(ctx).Debugf("(*passthroughBlobDescriptorService).Clear: passing down to upstream blob descriptor service") + return pbds.BlobDescriptorService.Clear(ctx, dgst) + } + context.GetLogger(ctx).Debugf("(*passthroughBlobDescriptorService).Clear: passing to openshift wrapper") + return (&blobDescriptorServiceFactory{}).BlobAccessController(pbds.BlobDescriptorService).Clear(ctx, dgst) +} + +type passthroughBlobDescriptorServiceFactory struct{} + +func (pbf *passthroughBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService { + return &passthroughBlobDescriptorService{svc} +} + +func setPassthroughBlobDescriptorServiceFactory() { + middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&passthroughBlobDescriptorServiceFactory{})) +} diff --git a/pkg/dockerregistry/server/manifestservice.go b/pkg/dockerregistry/server/manifestservice.go index 29039e8343f6..cf6ebf6eb4c5 100644 --- a/pkg/dockerregistry/server/manifestservice.go +++ b/pkg/dockerregistry/server/manifestservice.go @@ -10,6 +10,7 @@ import ( "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/registry/api/errcode" regapi "github.com/docker/distribution/registry/api/v2" kapi "k8s.io/kubernetes/pkg/api" @@ -34,7 +35,7 @@ type manifestService struct { func (m *manifestService) Exists(ctx context.Context, dgst digest.Digest) (bool, error) { context.GetLogger(ctx).Debugf("(*manifestService).Exists") - image, err := m.repo.getImage(dgst) + image, _, err := m.repo.getImageOfImageStream(dgst) if err != nil { return false, err } @@ -45,14 +46,8 @@ func (m *manifestService) Exists(ctx context.Context, dgst digest.Digest) (bool, func (m *manifestService) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { context.GetLogger(ctx).Debugf("(*manifestService).Get") - if _, err := m.repo.getImageStreamImage(dgst); err != nil { - context.GetLogger(ctx).Errorf("error retrieving ImageStreamImage %s/%s@%s: %v", m.repo.namespace, m.repo.name, dgst.String(), err) - return nil, err - } - - image, err := m.repo.getImage(dgst) + image, _, err := m.repo.getImageOfImageStream(dgst) if err != nil { - context.GetLogger(ctx).Errorf("error retrieving image %s: %v", dgst.String(), err) return nil, err } @@ -186,22 +181,12 @@ func (m *manifestService) Put(ctx context.Context, manifest distribution.Manifes return "", err } - stream := imageapi.ImageStream{} - stream.Name = m.repo.name - - uclient, ok := UserClientFrom(m.ctx) - if !ok { - context.GetLogger(ctx).Errorf("error creating user client to auto provision image stream: Origin user client unavailable") - return "", statusErr - } - - if _, err := uclient.ImageStreams(m.repo.namespace).Create(&stream); err != nil { - if quotautil.IsErrorQuotaExceeded(err) { - context.GetLogger(ctx).Errorf("denied creating ImageStream: %v", err) - return "", distribution.ErrAccessDenied + if _, err := m.repo.createImageStream(ctx); err != nil { + if e, ok := err.(errcode.Error); ok && e.ErrorCode() == errcode.ErrorCodeUnknown { + // TODO: convert statusErr to distribution error + return "", statusErr } - context.GetLogger(ctx).Errorf("error auto provisioning ImageStream: %s", err) - return "", statusErr + return "", err } // try to create the ISM again diff --git a/pkg/dockerregistry/server/pullthroughblobstore.go b/pkg/dockerregistry/server/pullthroughblobstore.go index 9437ea009846..87e7381802f7 100644 --- a/pkg/dockerregistry/server/pullthroughblobstore.go +++ b/pkg/dockerregistry/server/pullthroughblobstore.go @@ -24,9 +24,10 @@ var _ distribution.BlobStore = &pullthroughBlobStore{} // Stat makes a local check for the blob, then falls through to the other servers referenced by // the image stream and looks for those that have the layer. -func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { +func (pbs *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + context.GetLogger(ctx).Debugf("(*pullthroughBlobStore).Stat: starting with dgst=%s", dgst.String()) // check the local store for the blob - desc, err := r.BlobStore.Stat(ctx, dgst) + desc, err := pbs.BlobStore.Stat(ctx, dgst) switch { case err == distribution.ErrBlobUnknown: // continue on to the code below and look up the blob in a remote store since it is not in @@ -38,13 +39,7 @@ func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (di return desc, err } - remoteGetter, found := RemoteBlobGetterFrom(r.repo.ctx) - if !found { - context.GetLogger(ctx).Errorf("pullthroughBlobStore.Stat: failed to retrieve remote getter from context") - return distribution.Descriptor{}, distribution.ErrBlobUnknown - } - - return remoteGetter.Stat(ctx, dgst) + return pbs.repo.remoteBlobGetter.Stat(ctx, dgst) } // ServeBlob attempts to serve the requested digest onto w, using a remote proxy store if necessary. @@ -53,6 +48,7 @@ func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (di // success response with no actual body content. // [1] https://docs.docker.com/registry/spec/api/#existing-layers func (pbs *pullthroughBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error { + context.GetLogger(ctx).Debugf("(*pullthroughBlobStore).ServeBlob: starting with dgst=%s", dgst.String()) // This call should be done without BlobGetterService in the context. err := pbs.BlobStore.ServeBlob(ctx, w, req, dgst) switch { @@ -66,11 +62,7 @@ func (pbs *pullthroughBlobStore) ServeBlob(ctx context.Context, w http.ResponseW return err } - remoteGetter, found := RemoteBlobGetterFrom(pbs.repo.ctx) - if !found { - context.GetLogger(ctx).Errorf("pullthroughBlobStore.ServeBlob: failed to retrieve remote getter from context") - return distribution.ErrBlobUnknown - } + remoteGetter := pbs.repo.remoteBlobGetter // store the content locally if requested, but ensure only one instance at a time // is storing to avoid excessive local writes @@ -99,19 +91,14 @@ func (pbs *pullthroughBlobStore) ServeBlob(ctx context.Context, w http.ResponseW } // Get attempts to fetch the requested blob by digest using a remote proxy store if necessary. -func (r *pullthroughBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { - data, originalErr := r.BlobStore.Get(ctx, dgst) +func (pbs *pullthroughBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + context.GetLogger(ctx).Debugf("(*pullthroughBlobStore).Get: starting with dgst=%s", dgst.String()) + data, originalErr := pbs.BlobStore.Get(ctx, dgst) if originalErr == nil { return data, nil } - remoteGetter, found := RemoteBlobGetterFrom(r.repo.ctx) - if !found { - context.GetLogger(ctx).Errorf("pullthroughBlobStore.Get: failed to retrieve remote getter from context") - return nil, originalErr - } - - return remoteGetter.Get(ctx, dgst) + return pbs.repo.remoteBlobGetter.Get(ctx, dgst) } // setResponseHeaders sets the appropriate content serving headers diff --git a/pkg/dockerregistry/server/pullthroughblobstore_test.go b/pkg/dockerregistry/server/pullthroughblobstore_test.go index 59f9408f7c75..2df1fd19638e 100644 --- a/pkg/dockerregistry/server/pullthroughblobstore_test.go +++ b/pkg/dockerregistry/server/pullthroughblobstore_test.go @@ -13,12 +13,9 @@ import ( "time" "github.com/docker/distribution" - "github.com/docker/distribution/configuration" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest/schema1" - //"github.com/docker/distribution/registry/api/v2" - "github.com/docker/distribution/registry/handlers" _ "github.com/docker/distribution/registry/storage/driver/inmemory" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" @@ -30,11 +27,12 @@ import ( ) func TestPullthroughServeBlob(t *testing.T) { - ctx := context.Background() - + namespace, name := "user", "app" + repoName := fmt.Sprintf("%s/%s", namespace, name) installFakeAccessController(t) + setPassthroughBlobDescriptorServiceFactory() - testImage, err := registrytest.NewImageForManifest("user/app", registrytest.SampleImageManifestSchema1, false) + testImage, err := registrytest.NewImageForManifest(repoName, registrytest.SampleImageManifestSchema1, "", false) if err != nil { t.Fatal(err) } @@ -49,28 +47,7 @@ func TestPullthroughServeBlob(t *testing.T) { DefaultRegistryClient = backupRegistryClient }() - // pullthrough middleware will attempt to pull from this registry instance - remoteRegistryApp := handlers.NewApp(ctx, &configuration.Configuration{ - Loglevel: "debug", - Auth: map[string]configuration.Parameters{ - fakeAuthorizerName: {"realm": fakeAuthorizerName}, - }, - Storage: configuration.Storage{ - "inmemory": configuration.Parameters{}, - "cache": configuration.Parameters{ - "blobdescriptor": "inmemory", - }, - "delete": configuration.Parameters{ - "enabled": true, - }, - }, - Middleware: map[string][]configuration.Middleware{ - "registry": {{Name: "openshift"}}, - "repository": {{Name: "openshift", Options: configuration.Parameters{"pullthrough": false}}}, - "storage": {{Name: "openshift"}}, - }, - }) - remoteRegistryServer := httptest.NewServer(remoteRegistryApp) + remoteRegistryServer := createTestRegistryServer(t, context.Background()) defer remoteRegistryServer.Close() serverURL, err := url.Parse(remoteRegistryServer.URL) @@ -78,9 +55,9 @@ func TestPullthroughServeBlob(t *testing.T) { t.Fatalf("error parsing server url: %v", err) } os.Setenv("DOCKER_REGISTRY_URL", serverURL.Host) - testImage.DockerImageReference = fmt.Sprintf("%s/%s@%s", serverURL.Host, "user/app", testImage.Name) + testImage.DockerImageReference = fmt.Sprintf("%s/%s@%s", serverURL.Host, repoName, testImage.Name) - testImageStream := registrytest.TestNewImageStreamObject("user", "app", "latest", testImage.Name, testImage.DockerImageReference) + testImageStream := registrytest.TestNewImageStreamObject(namespace, name, "latest", testImage.Name, testImage.DockerImageReference) if testImageStream.Annotations == nil { testImageStream.Annotations = make(map[string]string) } @@ -88,11 +65,11 @@ func TestPullthroughServeBlob(t *testing.T) { client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, *testImageStream)) - blob1Desc, blob1Content, err := registrytest.UploadTestBlob(serverURL, nil, "user/app") + blob1Desc, blob1Content, err := registrytest.UploadRandomTestBlob(serverURL, nil, repoName) if err != nil { t.Fatal(err) } - blob2Desc, blob2Content, err := registrytest.UploadTestBlob(serverURL, nil, "user/app") + blob2Desc, blob2Content, err := registrytest.UploadRandomTestBlob(serverURL, nil, repoName) if err != nil { t.Fatal(err) } @@ -171,25 +148,393 @@ func TestPullthroughServeBlob(t *testing.T) { } { localBlobStore := newTestBlobStore(tc.localBlobs) - cachedLayers, err := newDigestToRepositoryCache(10) + ctx := WithTestPassthroughToUpstream(context.Background(), false) + repo := newTestRepositoryForPullthrough(t, ctx, nil, namespace, name, client, true) + ptbs := &pullthroughBlobStore{ + BlobStore: localBlobStore, + repo: repo, + } + + req, err := http.NewRequest(tc.method, fmt.Sprintf("http://example.org/v2/user/app/blobs/%s", tc.blobDigest), nil) if err != nil { - t.Fatal(err) + t.Fatalf("[%s] failed to create http request: %v", tc.name, err) } - repo := &repository{ - ctx: ctx, - namespace: "user", - name: "app", - pullthrough: true, - cachedLayers: cachedLayers, - registryOSClient: client, + w := httptest.NewRecorder() + + dgst := digest.Digest(tc.blobDigest) + + _, err = ptbs.Stat(ctx, dgst) + if err != tc.expectedStatError { + t.Errorf("[%s] Stat returned unexpected error: %#+v != %#+v", tc.name, err, tc.expectedStatError) + } + if err != nil || tc.expectedStatError != nil { + continue + } + err = ptbs.ServeBlob(ctx, w, req, dgst) + if err != nil { + t.Errorf("[%s] unexpected ServeBlob error: %v", tc.name, err) + continue } - rbs := &remoteBlobGetterService{ - repo: repo, - digestToStore: make(map[string]distribution.BlobStore), + clstr := w.Header().Get("Content-Length") + if cl, err := strconv.ParseInt(clstr, 10, 64); err != nil { + t.Errorf(`[%s] unexpected Content-Length: %q != "%d"`, tc.name, clstr, tc.expectedContentLength) + } else { + if cl != tc.expectedContentLength { + t.Errorf("[%s] Content-Length does not match expected size: %d != %d", tc.name, cl, tc.expectedContentLength) + } + } + if w.Header().Get("Content-Type") != "application/octet-stream" { + t.Errorf("[%s] Content-Type does not match expected: %q != %q", tc.name, w.Header().Get("Content-Type"), "application/octet-stream") } - ctx = WithRemoteBlobGetter(ctx, rbs) + body := w.Body.Bytes() + if int64(len(body)) != tc.expectedBytesServed { + t.Errorf("[%s] unexpected size of body: %d != %d", tc.name, len(body), tc.expectedBytesServed) + } + + for name, expCount := range tc.expectedLocalCalls { + count := localBlobStore.calls[name] + if count != expCount { + t.Errorf("[%s] expected %d calls to method %s of local blob store, not %d", tc.name, expCount, name, count) + } + } + for name, count := range localBlobStore.calls { + if _, exists := tc.expectedLocalCalls[name]; !exists { + t.Errorf("[%s] expected no calls to method %s of local blob store, got %d", tc.name, name, count) + } + } + + if localBlobStore.bytesServed != tc.expectedBytesServedLocally { + t.Errorf("[%s] unexpected number of bytes served locally: %d != %d", tc.name, localBlobStore.bytesServed, tc.expectedBytesServed) + } + } +} + +func TestPullthroughServeBlobInsecure(t *testing.T) { + namespace := "user" + repo1 := "app1" + repo2 := "app2" + repo1Name := fmt.Sprintf("%s/%s", namespace, repo1) + repo2Name := fmt.Sprintf("%s/%s", namespace, repo2) + + installFakeAccessController(t) + setPassthroughBlobDescriptorServiceFactory() + + remoteRegistryServer := createTestRegistryServer(t, context.Background()) + defer remoteRegistryServer.Close() + + serverURL, err := url.Parse(remoteRegistryServer.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + + m1dgst, m1canonical, m1cfg, m1manifest, err := registrytest.CreateAndUploadTestManifest( + registrytest.ManifestSchema2, 2, serverURL, nil, repo1Name, "foo") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + _, m1payload, err := m1manifest.Payload() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Logf("m1dgst=%s, m1manifest: %s", m1dgst, m1canonical) + m2dgst, m2canonical, m2cfg, m2manifest, err := registrytest.CreateAndUploadTestManifest( + registrytest.ManifestSchema2, 2, serverURL, nil, repo2Name, "bar") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + _, m2payload, err := m2manifest.Payload() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Logf("m2dgst=%s, m2manifest: %s", m2dgst, m2canonical) + + m1img, err := registrytest.NewImageForManifest(repo1Name, string(m1payload), m1cfg, false) + if err != nil { + t.Fatal(err) + } + m1img.DockerImageReference = fmt.Sprintf("%s/%s/%s@%s", serverURL.Host, namespace, repo1, m1img.Name) + m1img.DockerImageManifest = "" + m2img, err := registrytest.NewImageForManifest(repo2Name, string(m2payload), m2cfg, false) + if err != nil { + t.Fatal(err) + } + m2img.DockerImageReference = fmt.Sprintf("%s/%s/%s@%s", serverURL.Host, namespace, repo2, m2img.Name) + m2img.DockerImageManifest = "" + + for _, tc := range []struct { + name string + method string + blobDigest digest.Digest + localBlobs map[digest.Digest][]byte + imageStreamInit func(client *testclient.Fake) *imageapi.ImageStream + expectedStatError error + expectedContentLength int64 + expectedBytesServed int64 + expectedBytesServedLocally int64 + expectedLocalCalls map[string]int + }{ + { + name: "stat remote blob with insecure repository", + method: "HEAD", + blobDigest: digest.Digest(m1img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1dgst.String(), m1img.DockerImageReference) + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "true"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedContentLength: int64(m1img.DockerImageLayers[0].LayerSize), + expectedLocalCalls: map[string]int{"Stat": 1, "ServeBlob": 1}, + }, + + { + name: "serve remote blob with insecure repository", + method: "GET", + blobDigest: digest.Digest(m1img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1dgst.String(), m1img.DockerImageReference) + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "true"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedContentLength: int64(m1img.DockerImageLayers[0].LayerSize), + expectedBytesServed: int64(m1img.DockerImageLayers[0].LayerSize), + expectedLocalCalls: map[string]int{"Stat": 1, "ServeBlob": 1}, + }, + + { + name: "stat remote blob with secure repository", + method: "HEAD", + blobDigest: digest.Digest(m1img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1dgst.String(), m1img.DockerImageReference) + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedStatError: distribution.ErrBlobUnknown, + }, + + { + name: "serve remote blob with secure repository", + method: "GET", + blobDigest: digest.Digest(m1img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1dgst.String(), m1img.DockerImageReference) + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedStatError: distribution.ErrBlobUnknown, + }, + + { + name: "stat remote blob with with insecure tag", + method: "HEAD", + blobDigest: digest.Digest(m2img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img, *m2img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1dgst.String(), m1img.DockerImageReference) + is.Status.Tags["tag2"] = imageapi.TagEventList{ + Items: []imageapi.TagEvent{ + { + Image: m2img.Name, + DockerImageReference: m2img.DockerImageReference, + }, + }, + } + is.Spec.Tags = map[string]imageapi.TagReference{ + "tag1": { + Name: "tag1", + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + "tag2": { + Name: "tag2", + ImportPolicy: imageapi.TagImportPolicy{Insecure: true}, + }, + } + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedContentLength: int64(m2img.DockerImageLayers[0].LayerSize), + expectedLocalCalls: map[string]int{"Stat": 1, "ServeBlob": 1}, + }, + + { + name: "serve remote blob with insecure tag", + method: "GET", + blobDigest: digest.Digest(m2img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img, *m2img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1img.Name, m1img.DockerImageReference) + is.Status.Tags["tag2"] = imageapi.TagEventList{ + Items: []imageapi.TagEvent{ + { + Image: m2img.Name, + DockerImageReference: m2img.DockerImageReference, + }, + }, + } + is.Spec.Tags = map[string]imageapi.TagReference{ + "tag1": { + Name: "tag1", + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + "tag2": { + Name: "tag2", + ImportPolicy: imageapi.TagImportPolicy{Insecure: true}, + }, + } + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedLocalCalls: map[string]int{"Stat": 1, "ServeBlob": 1}, + expectedContentLength: int64(m2img.DockerImageLayers[0].LayerSize), + expectedBytesServed: int64(m2img.DockerImageLayers[0].LayerSize), + }, + + { + name: "insecure flag propagates to all repositories of the registry", + method: "GET", + blobDigest: digest.Digest(m2img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img, *m2img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1img.Name, m1img.DockerImageReference) + is.Status.Tags["tag2"] = imageapi.TagEventList{ + Items: []imageapi.TagEvent{ + { + Image: m2img.Name, + DockerImageReference: m2img.DockerImageReference, + }, + }, + } + is.Spec.Tags = map[string]imageapi.TagReference{ + "tag1": { + Name: "tag1", + // This value will propagate to the other tag as well. + ImportPolicy: imageapi.TagImportPolicy{Insecure: true}, + }, + "tag2": { + Name: "tag2", + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + } + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedLocalCalls: map[string]int{"Stat": 1, "ServeBlob": 1}, + expectedContentLength: int64(m2img.DockerImageLayers[0].LayerSize), + expectedBytesServed: int64(m2img.DockerImageLayers[0].LayerSize), + }, + + { + name: "serve remote blob with secure tag", + method: "GET", + blobDigest: digest.Digest(m1img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img, *m2img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1dgst.String(), m1img.DockerImageReference) + ref, err := imageapi.ParseDockerImageReference(m2img.DockerImageReference) + if err != nil { + t.Fatal(err) + } + // The two references must differ because all repositories of particular registry are + // considered insecure if there's at least one insecure flag for the registry. + ref.Registry = "docker.io" + is.Status.Tags["tag2"] = imageapi.TagEventList{ + Items: []imageapi.TagEvent{ + { + Image: m2img.Name, + DockerImageReference: ref.DockerClientDefaults().Exact(), + }, + }, + } + is.Spec.Tags = map[string]imageapi.TagReference{ + "tag1": { + Name: "tag1", + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + "tag2": { + Name: "tag2", + ImportPolicy: imageapi.TagImportPolicy{Insecure: true}, + }, + } + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedStatError: distribution.ErrBlobUnknown, + }, + + { + name: "serve remote blob with 2 tags pointing to the same image", + method: "GET", + blobDigest: digest.Digest(m1img.DockerImageLayers[0].Name), + imageStreamInit: func(client *testclient.Fake) *imageapi.ImageStream { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *m1img, *m2img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo1, "tag1", m1img.Name, m1img.DockerImageReference) + is.Status.Tags["tag2"] = imageapi.TagEventList{ + Items: []imageapi.TagEvent{ + { + Image: m1img.Name, + DockerImageReference: m1img.DockerImageReference, + }, + }, + } + is.Spec.Tags = map[string]imageapi.TagReference{ + "tag1": { + Name: "tag1", + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + "tag2": { + Name: "tag2", + ImportPolicy: imageapi.TagImportPolicy{Insecure: true}, + }, + } + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return is + }, + expectedLocalCalls: map[string]int{"Stat": 1, "ServeBlob": 1}, + expectedContentLength: int64(m1img.DockerImageLayers[0].LayerSize), + expectedBytesServed: int64(m1img.DockerImageLayers[0].LayerSize), + }, + } { + client := &testclient.Fake{} + + // TODO: get rid of those nasty global vars + backupRegistryClient := DefaultRegistryClient + DefaultRegistryClient = makeFakeRegistryClient(client, fake.NewSimpleClientset()) + defer func() { + // set it back once this test finishes to make other unit tests working again + DefaultRegistryClient = backupRegistryClient + }() + + tc.imageStreamInit(client) + + localBlobStore := newTestBlobStore(tc.localBlobs) + + ctx := WithTestPassthroughToUpstream(context.Background(), false) + + repo := newTestRepositoryForPullthrough(t, ctx, nil, namespace, repo1, client, true) ptbs := &pullthroughBlobStore{ BlobStore: localBlobStore, @@ -206,7 +551,7 @@ func TestPullthroughServeBlob(t *testing.T) { _, err = ptbs.Stat(ctx, dgst) if err != tc.expectedStatError { - t.Errorf("[%s] Stat returned unexpected error: %#+v != %#+v", tc.name, err, tc.expectedStatError) + t.Fatalf("[%s] Stat returned unexpected error: %#+v != %#+v", tc.name, err, tc.expectedStatError) } if err != nil || tc.expectedStatError != nil { continue @@ -252,6 +597,51 @@ func TestPullthroughServeBlob(t *testing.T) { } } +func newTestRepositoryForPullthrough( + t *testing.T, + ctx context.Context, + wrappedRepository distribution.Repository, + namespace, name string, + client *testclient.Fake, + enablePullThrough bool, +) *repository { + cachedLayers, err := newDigestToRepositoryCache(10) + if err != nil { + t.Fatal(err) + } + + isGetter := &cachedImageStreamGetter{ + ctx: ctx, + namespace: namespace, + name: name, + isNamespacer: client, + } + + r := &repository{ + Repository: wrappedRepository, + ctx: ctx, + namespace: namespace, + name: name, + pullthrough: enablePullThrough, + cachedLayers: cachedLayers, + registryOSClient: client, + imageStreamGetter: isGetter, + cachedImages: make(map[digest.Digest]*imageapi.Image), + } + + if enablePullThrough { + r.remoteBlobGetter = NewBlobGetterService( + namespace, + name, + defaultBlobRepositoryCacheTTL, + isGetter.get, + client, + cachedLayers) + } + + return r +} + const ( unknownBlobDigest = "sha256:bef57ec7f53a6d40beb640a780a639c83bc29ac8a9816f1fc6c5c6dcd93c4721" ) diff --git a/pkg/dockerregistry/server/pullthroughmanifestservice.go b/pkg/dockerregistry/server/pullthroughmanifestservice.go index 6cc4569c8bf4..026007fb29f9 100644 --- a/pkg/dockerregistry/server/pullthroughmanifestservice.go +++ b/pkg/dockerregistry/server/pullthroughmanifestservice.go @@ -15,13 +15,13 @@ import ( type pullthroughManifestService struct { distribution.ManifestService - repo *repository - pullFromInsecureRegistries bool + repo *repository } var _ distribution.ManifestService = &pullthroughManifestService{} func (m *pullthroughManifestService) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { + context.GetLogger(ctx).Debugf("(*pullthroughManifestService).Get: starting with dgst=%s", dgst.String()) manifest, err := m.ManifestService.Get(ctx, dgst, options...) switch err.(type) { case distribution.ErrManifestUnknownRevision: @@ -36,28 +36,20 @@ func (m *pullthroughManifestService) Get(ctx context.Context, dgst digest.Digest } func (m *pullthroughManifestService) remoteGet(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { - isi, err := m.repo.getImageStreamImage(dgst) + context.GetLogger(ctx).Debugf("(*pullthroughManifestService).remoteGet: starting with dgst=%s", dgst.String()) + image, _, err := m.repo.getImageOfImageStream(dgst) if err != nil { - context.GetLogger(ctx).Errorf("error retrieving ImageStreamImage %s/%s@%s: %v", m.repo.namespace, m.repo.name, dgst.String(), err) return nil, err } - m.pullFromInsecureRegistries = false - - if insecure, ok := isi.Annotations[imageapi.InsecureRepositoryAnnotation]; ok { - m.pullFromInsecureRegistries = insecure == "true" - } - - ref, err := imageapi.ParseDockerImageReference(isi.Image.DockerImageReference) + ref, err := imageapi.ParseDockerImageReference(image.DockerImageReference) if err != nil { - context.GetLogger(ctx).Errorf("bad DockerImageReference in Image %s/%s@%s: %v", m.repo.namespace, m.repo.name, dgst.String(), err) + context.GetLogger(ctx).Errorf("bad DockerImageReference (%q) in Image %s/%s@%s: %v", image.DockerImageReference, m.repo.namespace, m.repo.name, dgst.String(), err) return nil, err } ref = ref.DockerClientDefaults() - retriever := m.repo.importContext() - - repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), m.pullFromInsecureRegistries) + repo, err := m.getRemoteRepositoryClient(ctx, &ref, dgst, options...) if err != nil { context.GetLogger(ctx).Errorf("error getting remote repository for image %q: %v", ref.Exact(), err) return nil, err @@ -81,3 +73,35 @@ func (m *pullthroughManifestService) remoteGet(ctx context.Context, dgst digest. return manifest, err } + +func (m *pullthroughManifestService) getRemoteRepositoryClient(ctx context.Context, ref *imageapi.DockerImageReference, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Repository, error) { + retriever := getImportContext(ctx, m.repo.registryOSClient, m.repo.namespace, m.repo.name) + + // determine, whether to fall-back to insecure transport based on a specification of image's tag + // if the client pulls by tag, use that + tag := "" + for _, option := range options { + if opt, ok := option.(distribution.WithTagOption); ok { + tag = opt.Tag + break + } + } + if len(tag) == 0 { + is, err := m.repo.imageStreamGetter.get() + if err != nil { + return nil, err // this is impossible + } + // if the client pulled by digest, find the corresponding tag in the image stream + tag, _ = imageapi.LatestImageTagEvent(is, dgst.String()) + } + insecure := pullInsecureByDefault(m.repo.imageStreamGetter.get, tag) + + return retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), insecure) +} + +func (m *pullthroughManifestService) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) { + context.GetLogger(ctx).Debugf("(*pullthroughManifestService).Put: enabling remote blob access check") + // manifest dependencies (layers and config) may not be stored locally, we need to be able to stat them in remote repositories + ctx = WithRemoteBlobAccessCheckEnabled(ctx, true) + return m.ManifestService.Put(ctx, manifest, options...) +} diff --git a/pkg/dockerregistry/server/pullthroughmanifestservice_test.go b/pkg/dockerregistry/server/pullthroughmanifestservice_test.go index efa0a0b43916..6e7557bfe442 100644 --- a/pkg/dockerregistry/server/pullthroughmanifestservice_test.go +++ b/pkg/dockerregistry/server/pullthroughmanifestservice_test.go @@ -1,30 +1,30 @@ package server import ( - "encoding/json" "fmt" - "net/http" "net/http/httptest" "net/url" "os" + "strings" "testing" "github.com/docker/distribution" "github.com/docker/distribution/configuration" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/registry/handlers" _ "github.com/docker/distribution/registry/storage/driver/inmemory" - kerrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "github.com/openshift/origin/pkg/client/testclient" registrytest "github.com/openshift/origin/pkg/dockerregistry/testutil" + imageapi "github.com/openshift/origin/pkg/image/api" ) func createTestRegistryServer(t *testing.T, ctx context.Context) *httptest.Server { + ctx = WithTestPassthroughToUpstream(ctx, true) + // pullthrough middleware will attempt to pull from this registry instance remoteRegistryApp := handlers.NewApp(ctx, &configuration.Configuration{ Loglevel: "debug", @@ -39,11 +39,11 @@ func createTestRegistryServer(t *testing.T, ctx context.Context) *httptest.Serve "delete": configuration.Parameters{ "enabled": true, }, - }, - Middleware: map[string][]configuration.Middleware{ - "registry": {{Name: "openshift"}}, - "repository": {{Name: "openshift"}}, - "storage": {{Name: "openshift"}}, + "maintenance": configuration.Parameters{ + "uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }, + }, }, }) @@ -61,6 +61,7 @@ func createTestRegistryServer(t *testing.T, ctx context.Context) *httptest.Serve func TestPullthroughManifests(t *testing.T) { namespace := "fuser" repo := "zapp" + repoName := fmt.Sprintf("%s/%s", namespace, repo) tag := "latest" client := &testclient.Fake{} @@ -74,10 +75,9 @@ func TestPullthroughManifests(t *testing.T) { }() installFakeAccessController(t) + setPassthroughBlobDescriptorServiceFactory() - ctx := context.Background() - - remoteRegistryServer := createTestRegistryServer(t, ctx) + remoteRegistryServer := createTestRegistryServer(t, context.Background()) defer remoteRegistryServer.Close() serverURL, err := url.Parse(remoteRegistryServer.URL) @@ -85,17 +85,26 @@ func TestPullthroughManifests(t *testing.T) { t.Fatalf("error parsing server url: %v", err) } - testImage := createTestImageReactor(t, client, serverURL, namespace, repo) - testImage.DockerImageManifest = "" - - testImageStream := createTestImageStreamReactor(t, client, testImage, namespace, repo, tag) - - client.AddReactor("get", "imagestreamimages", registrytest.GetFakeImageStreamImageGetHandler(t, testImageStream, *testImage)) + ms1dgst, ms1canonical, _, ms1manifest, err := registrytest.CreateAndUploadTestManifest( + registrytest.ManifestSchema1, 2, serverURL, nil, repoName, "schema1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + _, ms1payload, err := ms1manifest.Payload() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Logf("ms1dgst=%s, ms1manifest: %s", ms1dgst, ms1canonical) - signedManifest := &schema1.SignedManifest{} - if err := json.Unmarshal([]byte(etcdManifest), signedManifest); err != nil { - t.Fatalf("error unmarshaling signed manifest: %v", err) + image, err := registrytest.NewImageForManifest(repoName, string(ms1payload), "", false) + if err != nil { + t.Fatal(err) } + image.DockerImageReference = fmt.Sprintf("%s/%s/%s@%s", serverURL.Host, namespace, repo, image.Name) + image.DockerImageManifest = "" + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *image)) + + createTestImageStreamReactor(t, client, image, namespace, repo, tag) for _, tc := range []struct { name string @@ -106,18 +115,18 @@ func TestPullthroughManifests(t *testing.T) { expectedNotFoundError bool }{ { - name: "manifest digest", - manifestDigest: etcdDigest, + name: "manifest digest from local store", + manifestDigest: ms1dgst, localData: map[digest.Digest]distribution.Manifest{ - etcdDigest: signedManifest, + ms1dgst: ms1manifest, }, expectedLocalCalls: map[string]int{ "Get": 1, }, }, { - name: "manifest digest XXX", - manifestDigest: digest.Digest(testImage.Name), + name: "manifest served from remote repository", + manifestDigest: digest.Digest(image.Name), expectedLocalCalls: map[string]int{ "Get": 1, }, @@ -131,23 +140,15 @@ func TestPullthroughManifests(t *testing.T) { }, }, } { - localManifestService := newTestManifestService(namespace+"/"+repo, tc.localData) + localManifestService := newTestManifestService(repoName, tc.localData) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } + ctx := WithTestPassthroughToUpstream(context.Background(), false) + + repo := newTestRepositoryForPullthrough(t, ctx, nil, namespace, repo, client, true) ptms := &pullthroughManifestService{ ManifestService: localManifestService, - repo: &repository{ - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: true, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: repo, } manifestResult, err := ptms.Get(ctx, tc.manifestDigest) @@ -164,24 +165,270 @@ func TestPullthroughManifests(t *testing.T) { if tc.expectedError { break } - if tc.expectedNotFoundError && err == distribution.ErrBlobUnknown { - break + t.Fatalf("[%s] unexpected error: %#+v", tc.name, err) + } + + if tc.localData != nil { + if manifestResult != nil && manifestResult != tc.localData[tc.manifestDigest] { + t.Fatalf("[%s] unexpected result returned", tc.name) + } + } + + for name, count := range localManifestService.calls { + expectCount, exists := tc.expectedLocalCalls[name] + if !exists { + t.Errorf("[%s] expected no calls to method %s of local manifest service, got %d", tc.name, name, count) } - // TODO: The middleware should return distribution errors, not kube ones. - if e, ok := err.(*kerrors.StatusError); ok { - if tc.expectedNotFoundError && e.ErrStatus.Code == http.StatusNotFound { - break + if count != expectCount { + t.Errorf("[%s] unexpected number of calls to method %s of local manifest service, got %d", tc.name, name, count) + } + } + } +} + +func TestPullthroughManifestInsecure(t *testing.T) { + namespace := "fuser" + repo := "zapp" + repoName := fmt.Sprintf("%s/%s", namespace, repo) + + installFakeAccessController(t) + setPassthroughBlobDescriptorServiceFactory() + + remoteRegistryServer := createTestRegistryServer(t, context.Background()) + defer remoteRegistryServer.Close() + + serverURL, err := url.Parse(remoteRegistryServer.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + + ms1dgst, ms1canonical, _, ms1manifest, err := registrytest.CreateAndUploadTestManifest( + registrytest.ManifestSchema1, 2, serverURL, nil, repoName, "schema1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + _, ms1payload, err := ms1manifest.Payload() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Logf("ms1dgst=%s, ms1manifest: %s", ms1dgst, ms1canonical) + ms2dgst, ms2canonical, ms2config, ms2manifest, err := registrytest.CreateAndUploadTestManifest( + registrytest.ManifestSchema2, 2, serverURL, nil, repoName, "schema2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + _, ms2payload, err := ms2manifest.Payload() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Logf("ms2dgst=%s, ms2manifest: %s", ms2dgst, ms2canonical) + + ms1img, err := registrytest.NewImageForManifest(repoName, string(ms1payload), "", false) + if err != nil { + t.Fatal(err) + } + ms1img.DockerImageReference = fmt.Sprintf("%s/%s/%s@%s", serverURL.Host, namespace, repo, ms1img.Name) + ms1img.DockerImageManifest = "" + ms2img, err := registrytest.NewImageForManifest(repoName, string(ms2payload), ms2config, false) + if err != nil { + t.Fatal(err) + } + ms2img.DockerImageReference = fmt.Sprintf("%s/%s/%s@%s", serverURL.Host, namespace, repo, ms2img.Name) + ms2img.DockerImageManifest = "" + + for _, tc := range []struct { + name string + manifestDigest digest.Digest + localData map[digest.Digest]distribution.Manifest + imageStreamInit func(client *testclient.Fake) (*imageapi.Image, *imageapi.ImageStream) + expectedManifest distribution.Manifest + expectedLocalCalls map[string]int + expectedErrorString string + }{ + + { + name: "fetch schema 1 with allowed insecure", + manifestDigest: ms1dgst, + imageStreamInit: func(client *testclient.Fake) (*imageapi.Image, *imageapi.ImageStream) { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *ms1img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo, "schema1", string(ms1dgst), ms1img.DockerImageReference) + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "true"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return ms1img, is + }, + expectedManifest: ms1manifest, + expectedLocalCalls: map[string]int{ + "Get": 1, + }, + }, + + { + name: "fetch schema 2 with allowed insecure", + manifestDigest: ms2dgst, + imageStreamInit: func(client *testclient.Fake) (*imageapi.Image, *imageapi.ImageStream) { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *ms2img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo, "schema2", string(ms2dgst), ms2img.DockerImageReference) + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "true"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return ms2img, is + }, + expectedManifest: ms2manifest, + expectedLocalCalls: map[string]int{ + "Get": 1, + }, + }, + + { + name: "explicit forbid insecure", + manifestDigest: ms1dgst, + imageStreamInit: func(client *testclient.Fake) (*imageapi.Image, *imageapi.ImageStream) { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *ms1img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo, "schema1", string(ms1dgst), ms1img.DockerImageReference) + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "false"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return ms1img, is + }, + expectedErrorString: "server gave HTTP response to HTTPS client", + expectedLocalCalls: map[string]int{ + "Get": 1, + }, + }, + + { + name: "implicit forbid insecure", + manifestDigest: ms1dgst, + imageStreamInit: func(client *testclient.Fake) (*imageapi.Image, *imageapi.ImageStream) { + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *ms1img)) + + is := registrytest.TestNewImageStreamObject(namespace, repo, "schema1", string(ms1dgst), ms1img.DockerImageReference) + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return ms1img, is + }, + expectedErrorString: "server gave HTTP response to HTTPS client", + expectedLocalCalls: map[string]int{ + "Get": 1, + }, + }, + + { + name: "pullthrough from insecure tag", + manifestDigest: ms1dgst, + imageStreamInit: func(client *testclient.Fake) (*imageapi.Image, *imageapi.ImageStream) { + image, err := registrytest.NewImageForManifest(repoName, string(ms1payload), "", false) + if err != nil { + t.Fatal(err) + } + image.DockerImageReference = fmt.Sprintf("%s/%s/%s@%s", serverURL.Host, namespace, repo, ms1dgst) + image.DockerImageManifest = "" + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *image)) + + is := registrytest.TestNewImageStreamObject(namespace, repo, "schema1", string(ms1dgst), ms1img.DockerImageReference) + is.Spec.Tags = map[string]imageapi.TagReference{ + "foo": { + Name: "foo", + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + "schema1": { + Name: "schema1", + ImportPolicy: imageapi.TagImportPolicy{Insecure: true}, + }, + } + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return image, is + }, + expectedManifest: ms1manifest, + expectedLocalCalls: map[string]int{ + "Get": 1, + }, + }, + + { + name: "pull insecure if either image stream is insecure or the tag", + manifestDigest: ms2dgst, + imageStreamInit: func(client *testclient.Fake) (*imageapi.Image, *imageapi.ImageStream) { + image, err := registrytest.NewImageForManifest(repoName, string(ms2payload), ms2config, false) + if err != nil { + t.Fatal(err) + } + image.DockerImageReference = fmt.Sprintf("%s/%s/%s@%s", serverURL.Host, namespace, repo, image.Name) + image.DockerImageManifest = "" + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *image)) + + is := registrytest.TestNewImageStreamObject(namespace, repo, "schema2", image.Name, image.DockerImageReference) + is.Spec.Tags = map[string]imageapi.TagReference{ + "foo": { + Name: "foo", + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + "schema2": { + Name: "schema2", + // the value doesn't override is annotation because we cannot determine whether the + // value is explicit or just the default + ImportPolicy: imageapi.TagImportPolicy{Insecure: false}, + }, + } + is.Annotations = map[string]string{imageapi.InsecureRepositoryAnnotation: "true"} + client.AddReactor("get", "imagestreams", registrytest.GetFakeImageStreamGetHandler(t, *is)) + return image, is + }, + expectedManifest: ms2manifest, + expectedLocalCalls: map[string]int{ + "Get": 1, + }, + }, + } { + client := &testclient.Fake{} + + // TODO: get rid of those nasty global vars + backupRegistryClient := DefaultRegistryClient + DefaultRegistryClient = makeFakeRegistryClient(client, fake.NewSimpleClientset()) + defer func() { + // set it back once this test finishes to make other unit tests working again + DefaultRegistryClient = backupRegistryClient + }() + + tc.imageStreamInit(client) + + localManifestService := newTestManifestService(repoName, tc.localData) + + ctx := WithTestPassthroughToUpstream(context.Background(), false) + repo := newTestRepositoryForPullthrough(t, ctx, nil, namespace, repo, client, true) + ctx = WithRepository(ctx, repo) + + ptms := &pullthroughManifestService{ + ManifestService: localManifestService, + repo: repo, + } + + manifestResult, err := ptms.Get(ctx, tc.manifestDigest) + switch err.(type) { + case nil: + if len(tc.expectedErrorString) > 0 { + t.Errorf("[%s] unexpected successful response", tc.name) + continue + } + default: + if len(tc.expectedErrorString) > 0 { + if !strings.Contains(err.Error(), tc.expectedErrorString) { + t.Fatalf("expected error string %q, got instead: %s (%#+v)", tc.expectedErrorString, err.Error(), err) } + break } - t.Fatalf("[%s] unexpected error: %#+v", tc.name, err) + t.Fatalf("[%s] unexpected error: %#+v (%s)", tc.name, err, err.Error()) } if tc.localData != nil { if manifestResult != nil && manifestResult != tc.localData[tc.manifestDigest] { - t.Fatalf("[%s] unexpected result returned", tc.name) + t.Errorf("[%s] unexpected result returned", tc.name) + continue } } + registrytest.AssertManifestsEqual(t, tc.name, manifestResult, tc.expectedManifest) + for name, count := range localManifestService.calls { expectCount, exists := tc.expectedLocalCalls[name] if !exists { diff --git a/pkg/dockerregistry/server/remoteblobgetter.go b/pkg/dockerregistry/server/remoteblobgetter.go index 999fdd86f81c..980633d45966 100644 --- a/pkg/dockerregistry/server/remoteblobgetter.go +++ b/pkg/dockerregistry/server/remoteblobgetter.go @@ -2,13 +2,16 @@ package server import ( "net/http" + "sort" + "time" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/api/errcode" + disterrors "github.com/docker/distribution/registry/api/v2" - "k8s.io/kubernetes/pkg/api/errors" - + osclient "github.com/openshift/origin/pkg/client" imageapi "github.com/openshift/origin/pkg/image/api" "github.com/openshift/origin/pkg/image/importer" ) @@ -20,76 +23,104 @@ type BlobGetterService interface { distribution.BlobServer } +type ImageStreamGetter func() (*imageapi.ImageStream, error) + // remoteBlobGetterService implements BlobGetterService and allows to serve blobs from remote // repositories. type remoteBlobGetterService struct { - repo *repository - digestToStore map[string]distribution.BlobStore - pullFromInsecureRegistries bool + namespace string + name string + cacheTTL time.Duration + getImageStream ImageStreamGetter + isSecretsNamespacer osclient.ImageStreamSecretsNamespacer + cachedLayers digestToRepositoryCache + digestToStore map[string]distribution.BlobStore } var _ BlobGetterService = &remoteBlobGetterService{} +// NewBlobGetterService returns a getter for remote blobs. Its cache will be shared among different middleware +// wrappers, which is a must at least for stat calls made on manifest's dependencies during its verification. +func NewBlobGetterService( + namespace, name string, + cacheTTL time.Duration, + imageStreamGetter ImageStreamGetter, + isSecretsNamespacer osclient.ImageStreamSecretsNamespacer, + cachedLayers digestToRepositoryCache, +) BlobGetterService { + return &remoteBlobGetterService{ + namespace: namespace, + name: name, + getImageStream: imageStreamGetter, + isSecretsNamespacer: isSecretsNamespacer, + cacheTTL: cacheTTL, + cachedLayers: cachedLayers, + digestToStore: make(map[string]distribution.BlobStore), + } +} + +// imagePullthroughSpec contains a reference of remote image to pull associated with an insecure flag for the +// corresponding registry. +type imagePullthroughSpec struct { + dockerImageReference *imageapi.DockerImageReference + insecure bool +} + // Stat provides metadata about a blob identified by the digest. If the // blob is unknown to the describer, ErrBlobUnknown will be returned. -func (rbs *remoteBlobGetterService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { +func (rbgs *remoteBlobGetterService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Stat: starting with dgst=%s", dgst.String()) // look up the potential remote repositories that this blob could be part of (at this time, // we don't know which image in the image stream surfaced the content). - is, err := rbs.repo.getImageStream() + is, err := rbgs.getImageStream() if err != nil { - if errors.IsNotFound(err) || errors.IsForbidden(err) { + if t, ok := err.(errcode.Error); ok && t.ErrorCode() == disterrors.ErrorCodeNameUnknown { return distribution.Descriptor{}, distribution.ErrBlobUnknown } - context.GetLogger(ctx).Errorf("Error retrieving image stream for blob: %v", err) return distribution.Descriptor{}, err } - rbs.pullFromInsecureRegistries = false - - if insecure, ok := is.Annotations[imageapi.InsecureRepositoryAnnotation]; ok { - rbs.pullFromInsecureRegistries = insecure == "true" - } - var localRegistry string if local, err := imageapi.ParseDockerImageReference(is.Status.DockerImageRepository); err == nil { // TODO: normalize further? localRegistry = local.Registry } - retriever := rbs.repo.importContext() - cached := rbs.repo.cachedLayers.RepositoriesForDigest(dgst) + retriever := getImportContext(ctx, rbgs.isSecretsNamespacer, rbgs.namespace, rbgs.name) + cached := rbgs.cachedLayers.RepositoriesForDigest(dgst) // look at the first level of tagged repositories first - search := rbs.identifyCandidateRepositories(is, localRegistry, true) - if desc, err := rbs.findCandidateRepository(ctx, search, cached, dgst, retriever); err == nil { + repositoryCandidates, search := identifyCandidateRepositories(is, localRegistry, true) + if desc, err := rbgs.findCandidateRepository(ctx, repositoryCandidates, search, cached, dgst, retriever); err == nil { return desc, nil } // look at all other repositories tagged by the server - secondary := rbs.identifyCandidateRepositories(is, localRegistry, false) + repositoryCandidates, secondary := identifyCandidateRepositories(is, localRegistry, false) for k := range search { delete(secondary, k) } - if desc, err := rbs.findCandidateRepository(ctx, secondary, cached, dgst, retriever); err == nil { + if desc, err := rbgs.findCandidateRepository(ctx, repositoryCandidates, secondary, cached, dgst, retriever); err == nil { return desc, nil } return distribution.Descriptor{}, distribution.ErrBlobUnknown } -func (rbs *remoteBlobGetterService) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { - store, ok := rbs.digestToStore[dgst.String()] +func (rbgs *remoteBlobGetterService) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Open: starting with dgst=%s", dgst.String()) + store, ok := rbgs.digestToStore[dgst.String()] if ok { return store.Open(ctx, dgst) } - desc, err := rbs.Stat(ctx, dgst) + desc, err := rbgs.Stat(ctx, dgst) if err != nil { context.GetLogger(ctx).Errorf("Open: failed to stat blob %q in remote repositories: %v", dgst.String(), err) return nil, err } - store, ok = rbs.digestToStore[desc.Digest.String()] + store, ok = rbgs.digestToStore[desc.Digest.String()] if !ok { return nil, distribution.ErrBlobUnknown } @@ -97,19 +128,20 @@ func (rbs *remoteBlobGetterService) Open(ctx context.Context, dgst digest.Digest return store.Open(ctx, desc.Digest) } -func (rbs *remoteBlobGetterService) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error { - store, ok := rbs.digestToStore[dgst.String()] +func (rbgs *remoteBlobGetterService) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error { + context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).ServeBlob: starting with dgst=%s", dgst.String()) + store, ok := rbgs.digestToStore[dgst.String()] if ok { return store.ServeBlob(ctx, w, req, dgst) } - desc, err := rbs.Stat(ctx, dgst) + desc, err := rbgs.Stat(ctx, dgst) if err != nil { context.GetLogger(ctx).Errorf("ServeBlob: failed to stat blob %q in remote repositories: %v", dgst.String(), err) return err } - store, ok = rbs.digestToStore[desc.Digest.String()] + store, ok = rbgs.digestToStore[desc.Digest.String()] if !ok { return distribution.ErrBlobUnknown } @@ -118,47 +150,53 @@ func (rbs *remoteBlobGetterService) ServeBlob(ctx context.Context, w http.Respon } // proxyStat attempts to locate the digest in the provided remote repository or returns an error. If the digest is found, -// rbs.digestToStore saves the store. -func (rbs *remoteBlobGetterService) proxyStat(ctx context.Context, retriever importer.RepositoryRetriever, ref imageapi.DockerImageReference, dgst digest.Digest) (distribution.Descriptor, error) { - context.GetLogger(ctx).Infof("Trying to stat %q from %q", dgst, ref.Exact()) - - ctx = WithRemoteBlobGetter(ctx, rbs) - - repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), rbs.pullFromInsecureRegistries) +// rbgs.digestToStore saves the store. +func (rbgs *remoteBlobGetterService) proxyStat( + ctx context.Context, + retriever importer.RepositoryRetriever, + spec *imagePullthroughSpec, + dgst digest.Digest, +) (distribution.Descriptor, error) { + ref := spec.dockerImageReference + insecureNote := "" + if spec.insecure { + insecureNote = " with a fall-back to insecure transport" + } + context.GetLogger(ctx).Infof("Trying to stat %q from %q%s", dgst, ref.AsRepository().Exact(), insecureNote) + repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), spec.insecure) if err != nil { - context.GetLogger(ctx).Errorf("error getting remote repository for image %q: %v", ref.Exact(), err) + context.GetLogger(ctx).Errorf("Error getting remote repository for image %q: %v", ref.AsRepository().Exact(), err) return distribution.Descriptor{}, err } - bs := repo.Blobs(ctx) - - desc, err := bs.Stat(ctx, dgst) + pullthroughBlobStore := repo.Blobs(ctx) + desc, err := pullthroughBlobStore.Stat(ctx, dgst) if err != nil { if err != distribution.ErrBlobUnknown { - context.GetLogger(ctx).Errorf("error getting remoteBlobGetterService for image %q: %v", ref.Exact(), err) + context.GetLogger(ctx).Errorf("Error statting blob %s in remote repository %q: %v", dgst, ref.AsRepository().Exact(), err) } return distribution.Descriptor{}, err } - rbs.digestToStore[dgst.String()] = bs - + rbgs.digestToStore[dgst.String()] = pullthroughBlobStore return desc, nil } // Get attempts to fetch the requested blob by digest using a remote proxy store if necessary. -func (rbs *remoteBlobGetterService) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { - store, ok := rbs.digestToStore[dgst.String()] +func (rbgs *remoteBlobGetterService) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Get: starting with dgst=%s", dgst.String()) + store, ok := rbgs.digestToStore[dgst.String()] if ok { return store.Get(ctx, dgst) } - desc, err := rbs.Stat(ctx, dgst) + desc, err := rbgs.Stat(ctx, dgst) if err != nil { context.GetLogger(ctx).Errorf("Get: failed to stat blob %q in remote repositories: %v", dgst.String(), err) return nil, err } - store, ok = rbs.digestToStore[desc.Digest.String()] + store, ok = rbgs.digestToStore[desc.Digest.String()] if !ok { return nil, distribution.ErrBlobUnknown } @@ -167,7 +205,14 @@ func (rbs *remoteBlobGetterService) Get(ctx context.Context, dgst digest.Digest) } // findCandidateRepository looks in search for a particular blob, referring to previously cached items -func (rbs *remoteBlobGetterService) findCandidateRepository(ctx context.Context, search map[string]*imageapi.DockerImageReference, cachedLayers []string, dgst digest.Digest, retriever importer.RepositoryRetriever) (distribution.Descriptor, error) { +func (rbgs *remoteBlobGetterService) findCandidateRepository( + ctx context.Context, + repositoryCandidates []string, + search map[string]imagePullthroughSpec, + cachedLayers []string, + dgst digest.Digest, + retriever importer.RepositoryRetriever, +) (distribution.Descriptor, error) { // no possible remote locations to search, exit early if len(search) == 0 { return distribution.Descriptor{}, distribution.ErrBlobUnknown @@ -176,11 +221,11 @@ func (rbs *remoteBlobGetterService) findCandidateRepository(ctx context.Context, // see if any of the previously located repositories containing this digest are in this // image stream for _, repo := range cachedLayers { - ref, ok := search[repo] + spec, ok := search[repo] if !ok { continue } - desc, err := rbs.proxyStat(ctx, retriever, *ref, dgst) + desc, err := rbgs.proxyStat(ctx, retriever, &spec, dgst) if err != nil { delete(search, repo) continue @@ -190,12 +235,13 @@ func (rbs *remoteBlobGetterService) findCandidateRepository(ctx context.Context, } // search the remaining registries for this digest - for repo, ref := range search { - desc, err := rbs.proxyStat(ctx, retriever, *ref, dgst) + for _, repo := range repositoryCandidates { + spec := search[repo] + desc, err := rbgs.proxyStat(ctx, retriever, &spec, dgst) if err != nil { continue } - rbs.repo.cachedLayers.RememberDigest(dgst, rbs.repo.blobrepositorycachettl, repo) + rbgs.cachedLayers.RememberDigest(dgst, rbgs.cacheTTL, repo) context.GetLogger(ctx).Infof("Found digest location by search %q in %q", dgst, repo) return desc, nil } @@ -203,11 +249,54 @@ func (rbs *remoteBlobGetterService) findCandidateRepository(ctx context.Context, return distribution.Descriptor{}, distribution.ErrBlobUnknown } -// identifyCandidateRepositories returns a map of remote repositories referenced by this image stream. -func (rbs *remoteBlobGetterService) identifyCandidateRepositories(is *imageapi.ImageStream, localRegistry string, primary bool) map[string]*imageapi.DockerImageReference { +type byInsecureFlag struct { + repositories []string + specs []*imagePullthroughSpec +} + +func (by *byInsecureFlag) Len() int { + if len(by.specs) < len(by.repositories) { + return len(by.specs) + } + return len(by.repositories) +} +func (by *byInsecureFlag) Swap(i, j int) { + by.repositories[i], by.repositories[j] = by.repositories[j], by.repositories[i] + by.specs[i], by.specs[j] = by.specs[j], by.specs[i] +} +func (by *byInsecureFlag) Less(i, j int) bool { + if by.specs[i].insecure == by.specs[j].insecure { + switch { + case by.repositories[i] < by.repositories[j]: + return true + case by.repositories[i] > by.repositories[j]: + return false + default: + return by.specs[i].dockerImageReference.Exact() < by.specs[j].dockerImageReference.Exact() + } + } + return !by.specs[i].insecure +} + +// identifyCandidateRepositories returns a list of remote repository names sorted from the best candidate to +// the worst and a map of remote repositories referenced by this image stream. The best candidate is a secure +// one. The worst allows for insecure transport. +func identifyCandidateRepositories( + is *imageapi.ImageStream, + localRegistry string, + primary bool, +) ([]string, map[string]imagePullthroughSpec) { + insecureByDefault := false + if insecure, ok := is.Annotations[imageapi.InsecureRepositoryAnnotation]; ok { + insecureByDefault = insecure == "true" + } + + // maps registry to insecure flag + insecureRegistries := make(map[string]bool) + // identify the canonical location of referenced registries to search search := make(map[string]*imageapi.DockerImageReference) - for _, tagEvent := range is.Status.Tags { + for tag, tagEvent := range is.Status.Tags { var candidates []imageapi.TagEvent if primary { if len(tagEvent.Items) == 0 { @@ -231,8 +320,55 @@ func (rbs *remoteBlobGetterService) identifyCandidateRepositories(is *imageapi.I continue } ref = ref.DockerClientDefaults() + insecure := insecureByDefault + if tagRef, ok := is.Spec.Tags[tag]; ok { + insecure = insecureByDefault || tagRef.ImportPolicy.Insecure + } + if is := insecureRegistries[ref.Registry]; !is && insecure { + insecureRegistries[ref.Registry] = insecure + } + search[ref.AsRepository().Exact()] = &ref } } - return search + + repositories := make([]string, 0, len(search)) + results := make(map[string]imagePullthroughSpec) + specs := []*imagePullthroughSpec{} + for repo, ref := range search { + repositories = append(repositories, repo) + // accompany the reference with corresponding registry's insecure flag + spec := imagePullthroughSpec{ + dockerImageReference: ref, + insecure: insecureRegistries[ref.Registry], + } + results[repo] = spec + specs = append(specs, &spec) + } + + sort.Sort(&byInsecureFlag{repositories: repositories, specs: specs}) + + return repositories, results +} + +// pullInsecureByDefault returns true if the given repository or repository's tag allows for insecure +// transport. +func pullInsecureByDefault(isGetter ImageStreamGetter, tag string) bool { + insecureByDefault := false + + is, err := isGetter() + if err != nil { + return insecureByDefault + } + + if insecure, ok := is.Annotations[imageapi.InsecureRepositoryAnnotation]; ok { + insecureByDefault = insecure == "true" + } + + if insecureByDefault || len(tag) == 0 { + return insecureByDefault + } + + tagReference, ok := is.Spec.Tags[tag] + return ok && tagReference.ImportPolicy.Insecure } diff --git a/pkg/dockerregistry/server/remoteblobgetter_test.go b/pkg/dockerregistry/server/remoteblobgetter_test.go new file mode 100644 index 000000000000..4da95b88f55c --- /dev/null +++ b/pkg/dockerregistry/server/remoteblobgetter_test.go @@ -0,0 +1,223 @@ +package server + +import ( + "reflect" + "testing" + + _ "github.com/docker/distribution/registry/storage/driver/inmemory" + + kapi "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/diff" + + imageapi "github.com/openshift/origin/pkg/image/api" +) + +func TestIdentifyCandidateRepositories(t *testing.T) { + for _, tc := range []struct { + name string + is *imageapi.ImageStream + localRegistry string + primary bool + expectedRepositories []string + expectedSearch map[string]imagePullthroughSpec + }{ + { + name: "empty image stream", + is: &imageapi.ImageStream{}, + localRegistry: "localhost:5000", + }, + + { + name: "secure image stream with one image", + is: &imageapi.ImageStream{ + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "tag": { + Items: []imageapi.TagEvent{{DockerImageReference: "docker.io/busybox"}}, + }, + }, + }, + }, + localRegistry: "localhost:5000", + primary: true, + expectedRepositories: []string{"docker.io/library/busybox"}, + expectedSearch: map[string]imagePullthroughSpec{ + "docker.io/library/busybox": makeTestImagePullthroughSpec(t, "docker.io/library/busybox:latest", false), + }, + }, + + { + name: "secure image stream with one insecure image", + is: &imageapi.ImageStream{ + Spec: imageapi.ImageStreamSpec{ + Tags: map[string]imageapi.TagReference{ + "insecure": {ImportPolicy: imageapi.TagImportPolicy{Insecure: true}}, + }, + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "secure": { + Items: []imageapi.TagEvent{ + {DockerImageReference: "example.org/user/app:tag"}, + {DockerImageReference: "secure.example.org/user/app"}, + }, + }, + "insecure": { + Items: []imageapi.TagEvent{ + {DockerImageReference: "registry.example.org/user/app"}, + {DockerImageReference: "other.org/user/app"}, + }, + }, + }, + }, + }, + localRegistry: "localhost:5000", + primary: true, + expectedRepositories: []string{"example.org/user/app", "registry.example.org/user/app"}, + expectedSearch: map[string]imagePullthroughSpec{ + "example.org/user/app": makeTestImagePullthroughSpec(t, "example.org/user/app:tag", false), + "registry.example.org/user/app": makeTestImagePullthroughSpec(t, "registry.example.org/user/app:latest", true), + }, + }, + + { + name: "search secondary results in insecure image stream", + is: &imageapi.ImageStream{ + ObjectMeta: kapi.ObjectMeta{ + Annotations: map[string]string{imageapi.InsecureRepositoryAnnotation: "true"}, + }, + Spec: imageapi.ImageStreamSpec{ + Tags: map[string]imageapi.TagReference{ + "insecure": {ImportPolicy: imageapi.TagImportPolicy{Insecure: false}}, + }, + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "secure": { + Items: []imageapi.TagEvent{ + {DockerImageReference: "example.org/user/app:tag"}, + {DockerImageReference: "example.org/app:tag2"}, + }, + }, + "insecure": {Items: []imageapi.TagEvent{{DockerImageReference: "registry.example.org/user/app"}}}, + }, + }, + }, + localRegistry: "localhost:5000", + primary: false, + expectedRepositories: []string{"example.org/app"}, + expectedSearch: map[string]imagePullthroughSpec{ + "example.org/app": makeTestImagePullthroughSpec(t, "example.org/app:tag2", true), + }, + }, + + { + name: "empty secondary search", + is: &imageapi.ImageStream{ + ObjectMeta: kapi.ObjectMeta{ + Annotations: map[string]string{imageapi.InsecureRepositoryAnnotation: "true"}, + }, + Spec: imageapi.ImageStreamSpec{ + Tags: map[string]imageapi.TagReference{ + "insecure": {ImportPolicy: imageapi.TagImportPolicy{Insecure: false}}, + }, + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "secure": {Items: []imageapi.TagEvent{{DockerImageReference: "example.org/user/app:tag"}}}, + "insecure": {Items: []imageapi.TagEvent{{DockerImageReference: "registry.example.org/user/app"}}}, + }, + }, + }, + localRegistry: "localhost:5000", + primary: false, + }, + + { + name: "insecure flag propagates to the whole registry", + is: &imageapi.ImageStream{ + Spec: imageapi.ImageStreamSpec{ + Tags: map[string]imageapi.TagReference{ + "insecure": {ImportPolicy: imageapi.TagImportPolicy{Insecure: true}}, + }, + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "secure": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b/c"}}}, + "insecure": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b/app"}}}, + "foo": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b/c/foo"}}}, + "bar": {Items: []imageapi.TagEvent{{DockerImageReference: "other.b/bar"}}}, + "gas": {Items: []imageapi.TagEvent{{DockerImageReference: "a.a/app"}}}, + }, + }, + }, + localRegistry: "localhost:5000", + primary: true, + expectedRepositories: []string{"a.a/app", "other.b/bar", "a.b/app", "a.b/c", "a.b/c/foo"}, + expectedSearch: map[string]imagePullthroughSpec{ + "a.a/app": makeTestImagePullthroughSpec(t, "a.a/app:latest", false), + "other.b/bar": makeTestImagePullthroughSpec(t, "other.b/bar:latest", false), + "a.b/app": makeTestImagePullthroughSpec(t, "a.b/app:latest", true), + "a.b/c": makeTestImagePullthroughSpec(t, "a.b/c:latest", true), + "a.b/c/foo": makeTestImagePullthroughSpec(t, "a.b/c/foo:latest", true), + }, + }, + + { + name: "duplicate entries", + is: &imageapi.ImageStream{ + Spec: imageapi.ImageStreamSpec{ + Tags: map[string]imageapi.TagReference{ + "insecure": {ImportPolicy: imageapi.TagImportPolicy{Insecure: true}}, + }, + }, + Status: imageapi.ImageStreamStatus{ + Tags: map[string]imageapi.TagEventList{ + "secure": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b/foo"}}}, + "insecure": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b/app:latest"}}}, + "foo": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b/app"}}}, + "bar": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b.c/app"}}}, + "gas": {Items: []imageapi.TagEvent{{DockerImageReference: "a.b.c/app"}}}, + }, + }, + }, + localRegistry: "localhost:5000", + primary: true, + expectedRepositories: []string{"a.b.c/app", "a.b/app", "a.b/foo"}, + expectedSearch: map[string]imagePullthroughSpec{ + "a.b.c/app": makeTestImagePullthroughSpec(t, "a.b.c/app:latest", false), + "a.b/app": makeTestImagePullthroughSpec(t, "a.b/app:latest", true), + "a.b/foo": makeTestImagePullthroughSpec(t, "a.b/foo:latest", true), + }, + }, + } { + repositories, search := identifyCandidateRepositories(tc.is, tc.localRegistry, tc.primary) + + if !reflect.DeepEqual(repositories, tc.expectedRepositories) { + if len(repositories) != 0 || len(tc.expectedRepositories) != 0 { + t.Errorf("[%s] got unexpected repositories: %s", tc.name, diff.ObjectGoPrintDiff(repositories, tc.expectedRepositories)) + } + } + + for repo, spec := range search { + if expSpec, exists := tc.expectedSearch[repo]; !exists { + t.Errorf("[%s] got unexpected repository among results: %q: %#+v", tc.name, repo, spec) + } else if !reflect.DeepEqual(spec, expSpec) { + t.Errorf("[%s] got unexpected pull spec for repo %q: %s", tc.name, repo, diff.ObjectGoPrintDiff(spec, expSpec)) + } + } + for expRepo, expSpec := range tc.expectedSearch { + if _, exists := tc.expectedSearch[expRepo]; !exists { + t.Errorf("[%s] missing expected repository among results: %q: %#+v", tc.name, expRepo, expSpec) + } + } + } +} + +func makeTestImagePullthroughSpec(t *testing.T, ref string, insecure bool) imagePullthroughSpec { + r, err := imageapi.ParseDockerImageReference(ref) + if err != nil { + t.Fatal(err) + } + return imagePullthroughSpec{dockerImageReference: &r, insecure: insecure} +} diff --git a/pkg/dockerregistry/server/repositorymiddleware.go b/pkg/dockerregistry/server/repositorymiddleware.go index b0958e510786..e6264bb76671 100644 --- a/pkg/dockerregistry/server/repositorymiddleware.go +++ b/pkg/dockerregistry/server/repositorymiddleware.go @@ -11,17 +11,18 @@ import ( "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/registry/api/errcode" repomw "github.com/docker/distribution/registry/middleware/repository" + registrystorage "github.com/docker/distribution/registry/storage" - kapi "k8s.io/kubernetes/pkg/api" + kerrors "k8s.io/kubernetes/pkg/api/errors" kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" "k8s.io/kubernetes/pkg/client/restclient" "github.com/openshift/origin/pkg/client" - imageapi "github.com/openshift/origin/pkg/image/api" - "github.com/openshift/origin/pkg/image/importer" - "github.com/openshift/origin/pkg/dockerregistry/server/audit" + imageapi "github.com/openshift/origin/pkg/image/api" + quotautil "github.com/openshift/origin/pkg/quota/util" ) const ( @@ -143,10 +144,16 @@ type repository struct { acceptschema2 bool // blobrepositorycachettl is an eviction timeout for entries of cachedLayers blobrepositorycachettl time.Duration + // cachedImages contains images cached for the lifetime of the request being handled. + cachedImages map[digest.Digest]*imageapi.Image + // cachedImageStream stays cached for the entire time of handling signle repository-scoped request. + imageStreamGetter *cachedImageStreamGetter // cachedLayers remembers a mapping of layer digest to repositories recently seen with that image to avoid // having to check every potential upstream repository when a blob request is made. The cache is useful only // when session affinity is on for the registry, but in practice the first pull will fill the cache. cachedLayers digestToRepositoryCache + // remoteBlobGetter is used to fetch blobs from remote registries if pullthrough is enabled. + remoteBlobGetter BlobGetterService } // newRepositoryWithClient returns a new repository middleware. @@ -184,8 +191,16 @@ func newRepositoryWithClient( if len(nameParts) != 2 { return nil, fmt.Errorf("invalid repository name %q: it must be of the format /", repo.Named().Name()) } + namespace, name := nameParts[0], nameParts[1] + + imageStreamGetter := &cachedImageStreamGetter{ + ctx: ctx, + namespace: namespace, + name: name, + isNamespacer: registryOSClient, + } - return &repository{ + r := &repository{ Repository: repo, ctx: ctx, @@ -199,24 +214,30 @@ func newRepositoryWithClient( blobrepositorycachettl: blobrepositorycachettl, pullthrough: pullthrough, mirrorPullthrough: mirrorPullthrough, + imageStreamGetter: imageStreamGetter, + cachedImages: make(map[digest.Digest]*imageapi.Image), cachedLayers: cachedLayers, - }, nil + } + + if pullthrough { + r.remoteBlobGetter = NewBlobGetterService( + r.namespace, + r.name, + blobrepositorycachettl, + imageStreamGetter.get, + registryOSClient, + cachedLayers) + } + + return r, nil } // Manifests returns r, which implements distribution.ManifestService. func (r *repository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { - if r.pullthrough { - // Add to the context the BlobGetterService that provide access to remote servers. - // It will be used to validate manifest blobs. It only makes sense - // if the pullthrough is enabled. It needs to be instantiated here in order - // to share the cache among different stat calls made on manifest's dependencies. - ctx = WithRemoteBlobGetter(ctx, &remoteBlobGetterService{ - repo: r, - digestToStore: make(map[string]distribution.BlobStore), - }) - } - - ms, err := r.Repository.Manifests(WithRepository(ctx, r)) + // we do a verification of our own + // TODO: let upstream do the verification once they pass correct context object to their manifest handler + opts := append(options, registrystorage.SkipLayerVerification()) + ms, err := r.Repository.Manifests(WithRepository(ctx, r), opts...) if err != nil { return nil, err } @@ -251,27 +272,13 @@ func (r *repository) Manifests(ctx context.Context, options ...distribution.Mani // Blobs returns a blob store which can delegate to remote repositories. func (r *repository) Blobs(ctx context.Context) distribution.BlobStore { - repo := repository(*r) - - if r.pullthrough { - // Add to the context the BlobGetterService that provide access to remote servers. - // It will be used in pullthroughBlobStore. It needs to be instantiated here in - // order to share the cache for multiple stat calls made in descendant blob stores. - ctx = WithRemoteBlobGetter(ctx, &remoteBlobGetterService{ - repo: &repo, - digestToStore: make(map[string]distribution.BlobStore), - }) - } - - repo.ctx = ctx - bs := r.Repository.Blobs(ctx) if !quotaEnforcing.enforcementDisabled { bs = "aRestrictedBlobStore{ BlobStore: bs, - repo: &repo, + repo: r, } } @@ -279,14 +286,14 @@ func (r *repository) Blobs(ctx context.Context) distribution.BlobStore { bs = &pullthroughBlobStore{ BlobStore: bs, - repo: &repo, + repo: r, mirror: r.mirrorPullthrough, } } bs = &errorBlobStore{ store: bs, - repo: &repo, + repo: r, } if audit.LoggerExists(ctx) { @@ -321,32 +328,77 @@ func (r *repository) Tags(ctx context.Context) distribution.TagService { return ts } -// importContext loads secrets for this image stream and returns a context for getting distribution -// clients to remote repositories. -func (r *repository) importContext() importer.RepositoryRetriever { - secrets, err := r.registryOSClient.ImageStreamSecrets(r.namespace).Secrets(r.name, kapi.ListOptions{}) +// createImageStream creates a new image stream corresponding to r and caches it. +func (r *repository) createImageStream(ctx context.Context) (*imageapi.ImageStream, error) { + stream := imageapi.ImageStream{} + stream.Name = r.name + + uclient, ok := UserClientFrom(ctx) + if !ok { + errmsg := "error creating user client to auto provision image stream: user client to master API unavailable" + context.GetLogger(ctx).Errorf(errmsg) + return nil, errcode.ErrorCodeUnknown.WithDetail(errmsg) + } + + is, err := uclient.ImageStreams(r.namespace).Create(&stream) + switch { + case kerrors.IsAlreadyExists(err), kerrors.IsConflict(err): + context.GetLogger(ctx).Infof("conflict while creating ImageStream: %v", err) + return r.imageStreamGetter.get() + case kerrors.IsForbidden(err), kerrors.IsUnauthorized(err), quotautil.IsErrorQuotaExceeded(err): + context.GetLogger(ctx).Errorf("denied creating ImageStream: %v", err) + return nil, errcode.ErrorCodeDenied.WithDetail(err) + case err != nil: + context.GetLogger(ctx).Errorf("error auto provisioning ImageStream: %s", err) + return nil, errcode.ErrorCodeUnknown.WithDetail(err) + } + + r.imageStreamGetter.cacheImageStream(is) + return is, nil +} + +// getImage retrieves the Image with digest `dgst`. No authorization check is done. +func (r *repository) getImage(dgst digest.Digest) (*imageapi.Image, error) { + if image, exists := r.cachedImages[dgst]; exists { + context.GetLogger(r.ctx).Infof("(*repository).getImage: returning cached copy of %s", image.Name) + return image, nil + } + + image, err := r.registryOSClient.Images().Get(dgst.String()) if err != nil { - context.GetLogger(r.ctx).Errorf("error getting secrets for repository %q: %v", r.Named().Name(), err) - secrets = &kapi.SecretList{} + context.GetLogger(r.ctx).Errorf("failed to get image: %v", err) + return nil, wrapKStatusErrorOnGetImage(r.name, dgst, err) } - credentials := importer.NewCredentialsForSecrets(secrets.Items) - return importer.NewContext(secureTransport, insecureTransport).WithCredentials(credentials) -} -// getImageStream retrieves the ImageStream for r. -func (r *repository) getImageStream() (*imageapi.ImageStream, error) { - return r.registryOSClient.ImageStreams(r.namespace).Get(r.name) + context.GetLogger(r.ctx).Infof("(*repository).getImage: got image %s", image.Name) + r.cachedImages[dgst] = image + return image, nil } -// getImage retrieves the Image with digest `dgst`. -func (r *repository) getImage(dgst digest.Digest) (*imageapi.Image, error) { - return r.registryOSClient.Images().Get(dgst.String()) -} +// getImageOfImageStream retrieves the Image with digest `dgst` for the ImageStream associated with r. This +// ensures the image belongs to the image stream. It uses two queries to master API: +// 1st to get a corresponding image stream +// 2nd to get the image +// This allows us to cache the image stream for later use. +func (r *repository) getImageOfImageStream(dgst digest.Digest) (*imageapi.Image, *imageapi.ImageStream, error) { + stream, err := r.imageStreamGetter.get() + if err != nil { + context.GetLogger(r.ctx).Errorf("failed to get ImageStream: %v", err) + return nil, nil, wrapKStatusErrorOnGetImage(r.name, dgst, err) + } + + _, err = imageapi.ResolveImageID(stream, dgst.String()) + if err != nil { + context.GetLogger(r.ctx).Errorf("failed to resolve image %s in ImageStream %s/%s: %v", dgst.String(), r.namespace, r.name, err) + return nil, nil, wrapKStatusErrorOnGetImage(r.name, dgst, err) + } + + image, err := r.getImage(dgst) + if err != nil { + return nil, nil, wrapKStatusErrorOnGetImage(r.name, dgst, err) + } -// getImageStreamImage retrieves the Image with digest `dgst` for the ImageStream -// associated with r. This ensures the image belongs to the image stream. -func (r *repository) getImageStreamImage(dgst digest.Digest) (*imageapi.ImageStreamImage, error) { - return r.registryOSClient.ImageStreamImages(r.namespace).Get(r.name, dgst.String()) + return image, stream, nil } // updateImage modifies the Image. diff --git a/pkg/dockerregistry/server/repositorymiddleware_test.go b/pkg/dockerregistry/server/repositorymiddleware_test.go index 1486c7e41a63..0c0c21768575 100644 --- a/pkg/dockerregistry/server/repositorymiddleware_test.go +++ b/pkg/dockerregistry/server/repositorymiddleware_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + log "github.com/Sirupsen/logrus" + "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" @@ -38,6 +40,10 @@ const ( testImageLayerCount = 2 ) +func init() { + log.SetLevel(log.DebugLevel) +} + func TestRepositoryBlobStat(t *testing.T) { quotaEnforcing = "aEnforcingConfig{} @@ -76,7 +82,7 @@ func TestRepositoryBlobStat(t *testing.T) { name string managed bool }{{"nm/is", true}, {"registry.org:5000/user/app", false}} { - img, err := registrytest.NewImageForManifest(d.name, registrytest.SampleImageManifestSchema1, d.managed) + img, err := registrytest.NewImageForManifest(d.name, registrytest.SampleImageManifestSchema1, "", d.managed) if err != nil { t.Fatal(err) } @@ -390,6 +396,10 @@ func TestRepositoryBlobStatCacheEviction(t *testing.T) { } // query etcd + repo, err = reg.Repository(ctx, ref) // the repository needs to be recreated since it caches image streams and images + if err != nil { + t.Fatalf("failed to get repository: %v", err) + } desc, err = repo.Blobs(ctx).Stat(ctx, blob1Dgst) if err != nil { t.Fatalf("got unexpected stat error: %v", err) @@ -418,6 +428,10 @@ func TestRepositoryBlobStatCacheEviction(t *testing.T) { } // cache hit - don't query etcd + repo, err = reg.Repository(ctx, ref) // the repository needs to be recreated since it caches image streams and images + if err != nil { + t.Fatalf("failed to get repository: %v", err) + } desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) if err != nil { t.Fatalf("got unexpected stat error: %v", err) @@ -431,6 +445,10 @@ func TestRepositoryBlobStatCacheEviction(t *testing.T) { lastStatTimestamp := time.Now() // hit the cache + repo, err = reg.Repository(ctx, ref) + if err != nil { + t.Fatalf("failed to get repository: %v", err) + } desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) if err != nil { t.Fatalf("got unexpected stat error: %v", err) @@ -445,6 +463,10 @@ func TestRepositoryBlobStatCacheEviction(t *testing.T) { t.Logf("sleeping %s while waiting for eviction of blob %q from cache", blobRepoCacheTTL.String(), blob2Dgst.String()) time.Sleep(blobRepoCacheTTL - (time.Now().Sub(lastStatTimestamp))) + repo, err = reg.Repository(ctx, ref) + if err != nil { + t.Fatalf("failed to get repository: %v", err) + } desc, err = repo.Blobs(ctx).Stat(ctx, blob2Dgst) if err != nil { t.Fatalf("got unexpected stat error: %v", err) @@ -686,8 +708,8 @@ type testRegistry struct { var _ distribution.Namespace = &testRegistry{} -func (r *testRegistry) Repository(ctx context.Context, ref reference.Named) (distribution.Repository, error) { - repo, err := r.Namespace.Repository(ctx, ref) +func (reg *testRegistry) Repository(ctx context.Context, ref reference.Named) (distribution.Repository, error) { + repo, err := reg.Namespace.Repository(ctx, ref) if err != nil { return nil, err } @@ -699,20 +721,43 @@ func (r *testRegistry) Repository(ctx context.Context, ref reference.Named) (dis return nil, fmt.Errorf("failed to parse repository name %q", ref.Name()) } - return &repository{ + nm, name := parts[0], parts[1] + + isGetter := &cachedImageStreamGetter{ + ctx: ctx, + namespace: nm, + name: name, + isNamespacer: reg.osClient, + } + + r := &repository{ Repository: repo, ctx: ctx, quotaClient: kFakeClient.Core(), limitClient: kFakeClient.Core(), - registryOSClient: r.osClient, + registryOSClient: reg.osClient, registryAddr: "localhost:5000", - namespace: parts[0], - name: parts[1], - blobrepositorycachettl: r.blobrepositorycachettl, + namespace: nm, + name: name, + blobrepositorycachettl: reg.blobrepositorycachettl, + imageStreamGetter: isGetter, + cachedImages: make(map[digest.Digest]*imageapi.Image), cachedLayers: cachedLayers, - pullthrough: r.pullthrough, - }, nil + pullthrough: reg.pullthrough, + } + + if reg.pullthrough { + r.remoteBlobGetter = NewBlobGetterService( + nm, + name, + defaultBlobRepositoryCacheTTL, + isGetter.get, + reg.osClient, + cachedLayers) + } + + return r, nil } func testNewDescriptorForLayer(layer imageapi.ImageLayer) distribution.Descriptor { diff --git a/pkg/dockerregistry/server/signaturedispatcher_test.go b/pkg/dockerregistry/server/signaturedispatcher_test.go index 093e0ee3f61c..c10d3b6c0d02 100644 --- a/pkg/dockerregistry/server/signaturedispatcher_test.go +++ b/pkg/dockerregistry/server/signaturedispatcher_test.go @@ -50,7 +50,7 @@ func TestSignatureGet(t *testing.T) { Content: []byte("owGbwMvMwMQorp341GLVgXeMpw9kJDFE1LxLq1ZKLsosyUxOzFGyqlbKTEnNK8ksqQSxU/KTs1OLdItS01KLUvOSU5WslHLygeoy8otLrEwNDAz0S1KLS8CEVU4iiFKq1VHKzE1MT0XSnpuYl5kGlNNNyUwHKbFSKs5INDI1szIxMLIwtzBKNrBITUw1SbRItkw0skhKMzMzTDZItEgxTDZKS7ZINbRMSUpMTDVKMjC0SDIyNDA0NLQ0TzU0sTABWVZSWQByVmJJfm5mskJyfl5JYmZeapFCcWZ6XmJJaVE"), } - testImage, err := registrytest.NewImageForManifest("user/app", registrytest.SampleImageManifestSchema1, false) + testImage, err := registrytest.NewImageForManifest("user/app", registrytest.SampleImageManifestSchema1, "", false) if err != nil { t.Fatal(err) } @@ -81,6 +81,11 @@ func TestSignatureGet(t *testing.T) { "delete": configuration.Parameters{ "enabled": true, }, + "maintenance": configuration.Parameters{ + "uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }, + }, }, Middleware: map[string][]configuration.Middleware{ "registry": {{Name: "openshift"}}, @@ -185,6 +190,11 @@ func TestSignaturePut(t *testing.T) { "delete": configuration.Parameters{ "enabled": true, }, + "maintenance": configuration.Parameters{ + "uploadpurging": map[interface{}]interface{}{ + "enabled": false, + }, + }, }, Middleware: map[string][]configuration.Middleware{ "registry": {{Name: "openshift"}}, diff --git a/pkg/dockerregistry/server/tagservice.go b/pkg/dockerregistry/server/tagservice.go index 014525c69c6e..9d931546b2b5 100644 --- a/pkg/dockerregistry/server/tagservice.go +++ b/pkg/dockerregistry/server/tagservice.go @@ -18,7 +18,7 @@ type tagService struct { } func (t tagService) Get(ctx context.Context, tag string) (distribution.Descriptor, error) { - imageStream, err := t.repo.getImageStream() + imageStream, err := t.repo.imageStreamGetter.get() if err != nil { context.GetLogger(ctx).Errorf("error retrieving ImageStream %s/%s: %v", t.repo.namespace, t.repo.name, err) return distribution.Descriptor{}, distribution.ErrRepositoryUnknown{Name: t.repo.Named().Name()} @@ -50,7 +50,7 @@ func (t tagService) Get(ctx context.Context, tag string) (distribution.Descripto func (t tagService) All(ctx context.Context) ([]string, error) { tags := []string{} - imageStream, err := t.repo.getImageStream() + imageStream, err := t.repo.imageStreamGetter.get() if err != nil { context.GetLogger(ctx).Errorf("error retrieving ImageStream %s/%s: %v", t.repo.namespace, t.repo.name, err) return tags, distribution.ErrRepositoryUnknown{Name: t.repo.Named().Name()} @@ -97,7 +97,7 @@ func (t tagService) All(ctx context.Context) ([]string, error) { func (t tagService) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error) { tags := []string{} - imageStream, err := t.repo.getImageStream() + imageStream, err := t.repo.imageStreamGetter.get() if err != nil { context.GetLogger(ctx).Errorf("error retrieving ImageStream %s/%s: %v", t.repo.namespace, t.repo.name, err) return tags, distribution.ErrRepositoryUnknown{Name: t.repo.Named().Name()} @@ -147,7 +147,7 @@ func (t tagService) Lookup(ctx context.Context, desc distribution.Descriptor) ([ } func (t tagService) Tag(ctx context.Context, tag string, dgst distribution.Descriptor) error { - imageStream, err := t.repo.getImageStream() + imageStream, err := t.repo.imageStreamGetter.get() if err != nil { context.GetLogger(ctx).Errorf("error retrieving ImageStream %s/%s: %v", t.repo.namespace, t.repo.name, err) return distribution.ErrRepositoryUnknown{Name: t.repo.Named().Name()} @@ -183,7 +183,7 @@ func (t tagService) Tag(ctx context.Context, tag string, dgst distribution.Descr } func (t tagService) Untag(ctx context.Context, tag string) error { - imageStream, err := t.repo.getImageStream() + imageStream, err := t.repo.imageStreamGetter.get() if err != nil { context.GetLogger(ctx).Errorf("error retrieving ImageStream %s/%s: %v", t.repo.namespace, t.repo.name, err) return distribution.ErrRepositoryUnknown{Name: t.repo.Named().Name()} diff --git a/pkg/dockerregistry/server/tagservice_test.go b/pkg/dockerregistry/server/tagservice_test.go index fb3b77fd4218..89001c7fb13e 100644 --- a/pkg/dockerregistry/server/tagservice_test.go +++ b/pkg/dockerregistry/server/tagservice_test.go @@ -33,6 +33,7 @@ func createTestImageReactor(t *testing.T, client *testclient.Fake, serverURL *ur testImage, err := registrytest.NewImageForManifest( fmt.Sprintf("%s/%s", namespace, repo), string(testManifestSchema1), + "", false) if err != nil { t.Fatal(err) @@ -125,21 +126,10 @@ func TestTagGet(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - + r := newTestRepositoryForPullthrough(t, ctx, nil, namespace, repo, client, tc.pullthrough) ts := &tagService{ TagService: localTagService, - repo: &repository{ - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: tc.pullthrough, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } resultDesc, err := ts.Get(ctx, tc.tagName) @@ -191,27 +181,15 @@ func TestTagGetWithoutImageStream(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, true) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: true, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } _, err = ts.Get(ctx, tag) @@ -283,27 +261,14 @@ func TestTagCreation(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } - + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, tc.pullthrough) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: tc.pullthrough, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } err = ts.Tag(ctx, tc.tagName, tc.tagValue) @@ -344,27 +309,14 @@ func TestTagCreationWithoutImageStream(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } - + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, true) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: true, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } err = ts.Tag(ctx, tag, distribution.Descriptor{Digest: digest.Digest(testImage.Name)}) @@ -444,27 +396,15 @@ func TestTagDeletion(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, tc.pullthrough) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: tc.pullthrough, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } err = ts.Untag(ctx, tc.tagName) @@ -510,27 +450,14 @@ func TestTagDeletionWithoutImageStream(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } - + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, true) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: true, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } err = ts.Untag(ctx, tag) @@ -597,27 +524,14 @@ func TestTagGetAll(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } - + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, tc.pullthrough) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: tc.pullthrough, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } result, err := ts.All(ctx) @@ -655,27 +569,14 @@ func TestTagGetAllWithoutImageStream(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } - + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, true) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: true, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } _, err = ts.All(ctx) @@ -753,27 +654,14 @@ func TestTagLookup(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } - + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, tc.pullthrough) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: tc.pullthrough, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } result, err := ts.Lookup(ctx, tc.tagValue) @@ -818,27 +706,15 @@ func TestTagLookupWithoutImageStream(t *testing.T) { localTagService := newTestTagService(nil) - cachedLayers, err := newDigestToRepositoryCache(10) - if err != nil { - t.Fatal(err) - } - named, err := reference.ParseNamed(fmt.Sprintf("%s/%s", namespace, repo)) if err != nil { t.Fatal(err) } + r := newTestRepositoryForPullthrough(t, ctx, &testRepository{name: named}, namespace, repo, client, true) ts := &tagService{ TagService: localTagService, - repo: &repository{ - Repository: &testRepository{name: named}, - ctx: ctx, - namespace: namespace, - name: repo, - pullthrough: true, - cachedLayers: cachedLayers, - registryOSClient: client, - }, + repo: r, } _, err = ts.Lookup(ctx, distribution.Descriptor{Digest: digest.Digest(testImage.Name)}) diff --git a/pkg/dockerregistry/server/util.go b/pkg/dockerregistry/server/util.go index b0864d441fe9..8d791325d11c 100644 --- a/pkg/dockerregistry/server/util.go +++ b/pkg/dockerregistry/server/util.go @@ -8,8 +8,17 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/api/errcode" + disterrors "github.com/docker/distribution/registry/api/v2" + quotautil "github.com/openshift/origin/pkg/quota/util" + kapi "k8s.io/kubernetes/pkg/api" + kerrors "k8s.io/kubernetes/pkg/api/errors" + + osclient "github.com/openshift/origin/pkg/client" imageapi "github.com/openshift/origin/pkg/image/api" + "github.com/openshift/origin/pkg/image/importer" ) // Context keys @@ -18,6 +27,10 @@ const ( repositoryKey = "openshift.repository" remoteGetterServiceKey = "openshift.remote.getter.service" + + // remoteBlobAccessCheckEnabledKey allows blobDescriptorService to stat remote blobs which is useful only + // in case of manifest verification. + remoteBlobAccessCheckEnabledKey = "openshift.remote.blobaccesscheck.enabled" ) func WithRepository(parent context.Context, repo *repository) context.Context { @@ -28,12 +41,12 @@ func RepositoryFrom(ctx context.Context) (repo *repository, found bool) { return } -func WithRemoteBlobGetter(parent context.Context, svc BlobGetterService) context.Context { - return context.WithValue(parent, remoteGetterServiceKey, svc) +func WithRemoteBlobAccessCheckEnabled(parent context.Context, enable bool) context.Context { + return context.WithValue(parent, remoteBlobAccessCheckEnabledKey, enable) } -func RemoteBlobGetterFrom(ctx context.Context) (svc BlobGetterService, found bool) { - svc, found = ctx.Value(remoteGetterServiceKey).(BlobGetterService) - return +func RemoteBlobAccessCheckEnabledFrom(ctx context.Context) bool { + enabled, _ := ctx.Value(remoteBlobAccessCheckEnabledKey).(bool) + return enabled } func getOptionValue( @@ -150,3 +163,77 @@ func isImageManaged(image *imageapi.Image) bool { managed, ok := image.ObjectMeta.Annotations[imageapi.ManagedByOpenShiftAnnotation] return ok && managed == "true" } + +// wrapKStatusErrorOnGetImage transforms the given kubernetes status error into a distribution one. Upstream +// handler do not allow us to propagate custom error messages except for ErrManifetUnknownRevision. All the +// other errors will result in an internal server error with details made out of returned error. +func wrapKStatusErrorOnGetImage(repoName string, dgst digest.Digest, err error) error { + switch { + case kerrors.IsNotFound(err): + // This is the only error type we can propagate unchanged to the client. + return distribution.ErrManifestUnknownRevision{ + Name: repoName, + Revision: dgst, + } + case err != nil: + // We don't turn this error to distribution error on purpose: Upstream manifest handler wraps any + // error but distribution.ErrManifestUnknownRevision with errcode.ErrorCodeUnknown. If we wrap the + // original error with distribution.ErrorCodeUnknown, the "unknown error" will appear twice in the + // resulting error message. + return err + } + + return nil +} + +// getImportContext loads secrets for given repository and returns a context for getting distribution clients +// to remote repositories. +func getImportContext( + ctx context.Context, + osClient osclient.ImageStreamSecretsNamespacer, + namespace, name string, +) importer.RepositoryRetriever { + secrets, err := osClient.ImageStreamSecrets(namespace).Secrets(name, kapi.ListOptions{}) + if err != nil { + context.GetLogger(ctx).Errorf("error getting secrets for repository %s/%s: %v", namespace, name, err) + secrets = &kapi.SecretList{} + } + credentials := importer.NewCredentialsForSecrets(secrets.Items) + return importer.NewContext(secureTransport, insecureTransport).WithCredentials(credentials) +} + +// cachedImageStreamGetter wraps a master API client for getting image streams with a cache. +type cachedImageStreamGetter struct { + ctx context.Context + namespace string + name string + isNamespacer osclient.ImageStreamsNamespacer + cachedImageStream *imageapi.ImageStream +} + +func (g *cachedImageStreamGetter) get() (*imageapi.ImageStream, error) { + if g.cachedImageStream != nil { + context.GetLogger(g.ctx).Debugf("(*cachedImageStreamGetter).getImageStream: returning cached copy") + return g.cachedImageStream, nil + } + is, err := g.isNamespacer.ImageStreams(g.namespace).Get(g.name) + if err != nil { + context.GetLogger(g.ctx).Errorf("failed to get image stream: %v", err) + switch { + case kerrors.IsNotFound(err): + return nil, disterrors.ErrorCodeNameUnknown.WithDetail(err) + case kerrors.IsForbidden(err), kerrors.IsUnauthorized(err), quotautil.IsErrorQuotaExceeded(err): + return nil, errcode.ErrorCodeDenied.WithDetail(err) + default: + return nil, errcode.ErrorCodeUnknown.WithDetail(err) + } + } + + context.GetLogger(g.ctx).Debugf("(*cachedImageStreamGetter).getImageStream: got image stream %s/%s", is.Namespace, is.Name) + g.cachedImageStream = is + return is, nil +} + +func (g *cachedImageStreamGetter) cacheImageStream(is *imageapi.ImageStream) { + g.cachedImageStream = is +} diff --git a/pkg/dockerregistry/testutil/manifests.go b/pkg/dockerregistry/testutil/manifests.go index dd7f7a190a55..84ad49c48c6d 100644 --- a/pkg/dockerregistry/testutil/manifests.go +++ b/pkg/dockerregistry/testutil/manifests.go @@ -4,18 +4,30 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" + "net/url" + "reflect" + "testing" "github.com/docker/distribution" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/reference" + distclient "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/client/auth" + "github.com/docker/distribution/registry/client/transport" "github.com/docker/libtrust" + kapi "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/diff" + imageapi "github.com/openshift/origin/pkg/image/api" ) -type ManifestSchemaVesion int +type ManifestSchemaVersion int type LayerPayload []byte type ConfigPayload []byte @@ -25,25 +37,27 @@ type Payload struct { } const ( - ManifestSchema1 ManifestSchemaVesion = 1 - ManifestSchema2 ManifestSchemaVesion = 2 + ManifestSchema1 ManifestSchemaVersion = 1 + ManifestSchema2 ManifestSchemaVersion = 2 ) // MakeSchema1Manifest constructs a schema 1 manifest from a given list of digests and returns // the digest of the manifest // github.com/docker/distribution/testutil -func MakeSchema1Manifest(layers []distribution.Descriptor) (string, distribution.Manifest, error) { - manifest := schema1.Manifest{ +func MakeSchema1Manifest(name, tag string, layers []distribution.Descriptor) (string, distribution.Manifest, error) { + m := schema1.Manifest{ Versioned: manifest.Versioned{ SchemaVersion: 1, }, - Name: "who", - Tag: "cares", + FSLayers: make([]schema1.FSLayer, 0, len(layers)), + History: make([]schema1.History, 0, len(layers)), + Name: name, + Tag: tag, } for _, layer := range layers { - manifest.FSLayers = append(manifest.FSLayers, schema1.FSLayer{BlobSum: layer.Digest}) - manifest.History = append(manifest.History, schema1.History{V1Compatibility: "{}"}) + m.FSLayers = append(m.FSLayers, schema1.FSLayer{BlobSum: layer.Digest}) + m.History = append(m.History, schema1.History{V1Compatibility: "{}"}) } pk, err := libtrust.GenerateECP256PrivateKey() @@ -51,7 +65,7 @@ func MakeSchema1Manifest(layers []distribution.Descriptor) (string, distribution return "", nil, fmt.Errorf("unexpected error generating private key: %v", err) } - signedManifest, err := schema1.Sign(&manifest, pk) + signedManifest, err := schema1.Sign(&m, pk) if err != nil { return "", nil, fmt.Errorf("error signing manifest: %v", err) } @@ -65,7 +79,7 @@ func MakeSchema2Manifest(config distribution.Descriptor, layers []distribution.D m := schema2.Manifest{ Versioned: schema2.SchemaVersion, Config: config, - Layers: make([]distribution.Descriptor, len(layers)), + Layers: make([]distribution.Descriptor, 0, len(layers)), } m.Config.MediaType = schema2.MediaTypeConfig @@ -127,7 +141,7 @@ func MakeManifestConfig() (ConfigPayload, distribution.Descriptor, error) { return jsonBytes, cfgDesc, nil } -func CreateRandomManifest(schemaVersion ManifestSchemaVesion, layerCount int) (string, distribution.Manifest, *Payload, error) { +func CreateRandomManifest(schemaVersion ManifestSchemaVersion, layerCount int) (string, distribution.Manifest, *Payload, error) { var ( rawManifest string manifest distribution.Manifest @@ -146,9 +160,9 @@ func CreateRandomManifest(schemaVersion ManifestSchemaVesion, layerCount int) (s switch schemaVersion { case ManifestSchema1: - rawManifest, manifest, err = MakeSchema1Manifest(layersDescs) + rawManifest, manifest, err = MakeSchema1Manifest("who", "cares", layersDescs) case ManifestSchema2: - payload.Config, cfgDesc, err = MakeManifestConfig() + _, cfgDesc, err = MakeManifestConfig() if err != nil { return "", nil, nil, err } @@ -159,3 +173,182 @@ func CreateRandomManifest(schemaVersion ManifestSchemaVesion, layerCount int) (s return rawManifest, manifest, payload, err } + +// CreateUploadTestManifest generates a random manifest blob and uploads it to the given repository. For this +// purpose, a given number of layers will be created and uploaded. +func CreateAndUploadTestManifest( + schemaVersion ManifestSchemaVersion, + layerCount int, + serverURL *url.URL, + creds auth.CredentialStore, + repoName, tag string, +) (dgst digest.Digest, canonical, manifestConfig string, manifest distribution.Manifest, err error) { + var ( + layerDescriptors = make([]distribution.Descriptor, 0, layerCount) + ) + + for i := 0; i < layerCount; i++ { + ds, _, err := UploadRandomTestBlob(serverURL, creds, repoName) + if err != nil { + return "", "", "", nil, fmt.Errorf("unexpected error generating test blob layer: %v", err) + } + layerDescriptors = append(layerDescriptors, ds) + } + + switch schemaVersion { + case ManifestSchema1: + canonical, manifest, err = MakeSchema1Manifest(repoName, tag, layerDescriptors) + if err != nil { + return "", "", "", nil, fmt.Errorf("failed to make manifest of schema 1: %v", err) + } + case ManifestSchema2: + cfgPayload, cfgDesc, err := MakeManifestConfig() + if err != nil { + return "", "", "", nil, err + } + _, err = UploadPayloadAsBlob(cfgPayload, serverURL, creds, repoName) + if err != nil { + return "", "", "", nil, fmt.Errorf("failed to upload manifest config of schema 2: %v", err) + } + canonical, manifest, err = MakeSchema2Manifest(cfgDesc, layerDescriptors) + if err != nil { + return "", "", "", nil, fmt.Errorf("failed to make manifest schema 2: %v", err) + } + manifestConfig = string(cfgPayload) + default: + return "", "", "", nil, fmt.Errorf("unsupported manifest version %d", schemaVersion) + } + + expectedDgst := digest.FromBytes([]byte(canonical)) + + ctx := context.Background() + ref, err := reference.ParseNamed(repoName) + if err != nil { + return "", "", "", nil, err + } + + var rt http.RoundTripper + if creds != nil { + challengeManager := auth.NewSimpleChallengeManager() + _, err := ping(challengeManager, serverURL.String()+"/v2/", "") + if err != nil { + return "", "", "", nil, err + } + rt = transport.NewTransport( + nil, + auth.NewAuthorizer( + challengeManager, + auth.NewTokenHandler(nil, creds, repoName, "pull", "push"), + auth.NewBasicHandler(creds))) + } + repo, err := distclient.NewRepository(ctx, ref, serverURL.String(), rt) + if err != nil { + return "", "", "", nil, fmt.Errorf("failed to get repository %q: %v", repoName, err) + } + + ms, err := repo.Manifests(ctx) + if err != nil { + return "", "", "", nil, err + } + dgst, err = ms.Put(ctx, manifest) + if err != nil { + return "", "", "", nil, err + } + + if expectedDgst != dgst { + return "", "", "", nil, fmt.Errorf("registry server computed different digest for uploaded manifest than expected: %q != %q", dgst, expectedDgst) + } + + return dgst, canonical, manifestConfig, manifest, nil +} + +// AssertManifestsEqual compares two manifests and returns if they are equal. Signatures of manifest schema 1 +// are not taken into account. +func AssertManifestsEqual(t *testing.T, description string, ma distribution.Manifest, mb distribution.Manifest) { + if ma == mb { + return + } + + if (ma == nil) != (mb == nil) { + t.Fatalf("[%s] only one of the manifests is nil", description) + } + + _, pa, err := ma.Payload() + if err != nil { + t.Fatalf("[%s] failed to get payload for first manifest: %v", description, err) + } + _, pb, err := mb.Payload() + if err != nil { + t.Fatalf("[%s] failed to get payload for second manifest: %v", description, err) + } + + var va, vb manifest.Versioned + if err := json.Unmarshal([]byte(pa), &va); err != nil { + t.Fatalf("[%s] failed to unmarshal payload of the first manifest: %v", description, err) + } + if err := json.Unmarshal([]byte(pb), &vb); err != nil { + t.Fatalf("[%s] failed to unmarshal payload of the second manifest: %v", description, err) + } + + if !reflect.DeepEqual(va, vb) { + t.Fatalf("[%s] manifests are of different version: %s", description, diff.ObjectGoPrintDiff(va, vb)) + } + + switch va.SchemaVersion { + case 1: + ms1a, ok := ma.(*schema1.SignedManifest) + if !ok { + t.Fatalf("[%s] failed to convert first manifest (%T) to schema1.SignedManifest", description, ma) + } + ms1b, ok := mb.(*schema1.SignedManifest) + if !ok { + t.Fatalf("[%s] failed to convert first manifest (%T) to schema1.SignedManifest", description, mb) + } + if !reflect.DeepEqual(ms1a.Manifest, ms1b.Manifest) { + t.Fatalf("[%s] manifests don't match: %s", description, diff.ObjectGoPrintDiff(ms1a.Manifest, ms1b.Manifest)) + } + + case 2: + if !reflect.DeepEqual(ma, mb) { + t.Fatalf("[%s] manifests don't match: %s", description, diff.ObjectGoPrintDiff(ma, mb)) + } + + default: + t.Fatalf("[%s] unrecognized manifest schema version: %d", description, va.SchemaVersion) + } +} + +// NewImageManifest creates a new Image object for the given manifest string. Note that the manifest must +// contain signatures if it is of schema 1. +func NewImageForManifest(repoName string, rawManifest string, manifestConfig string, managedByOpenShift bool) (*imageapi.Image, error) { + var versioned manifest.Versioned + if err := json.Unmarshal([]byte(rawManifest), &versioned); err != nil { + return nil, err + } + + _, desc, err := distribution.UnmarshalManifest(versioned.MediaType, []byte(rawManifest)) + if err != nil { + return nil, err + } + + annotations := make(map[string]string) + if managedByOpenShift { + annotations[imageapi.ManagedByOpenShiftAnnotation] = "true" + } + + img := &imageapi.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: desc.Digest.String(), + Annotations: annotations, + }, + DockerImageReference: fmt.Sprintf("localhost:5000/%s@%s", repoName, desc.Digest.String()), + DockerImageManifest: rawManifest, + DockerImageConfig: manifestConfig, + } + + if err := imageapi.ImageWithMetadata(img); err != nil { + return nil, fmt.Errorf("failed to fill image with metadata: %v", err) + } + + return img, nil +} diff --git a/pkg/dockerregistry/testutil/util.go b/pkg/dockerregistry/testutil/util.go index 991a4be31a68..2ba58f2503cc 100644 --- a/pkg/dockerregistry/testutil/util.go +++ b/pkg/dockerregistry/testutil/util.go @@ -4,7 +4,6 @@ import ( "archive/tar" "bytes" "crypto/rand" - "encoding/json" "fmt" "io" "io/ioutil" @@ -17,7 +16,6 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/manifest" "github.com/docker/distribution/reference" distclient "github.com/docker/distribution/registry/client" "github.com/docker/distribution/registry/client/auth" @@ -31,46 +29,15 @@ import ( imageapi "github.com/openshift/origin/pkg/image/api" ) -func NewImageForManifest(repoName string, rawManifest string, managedByOpenShift bool) (*imageapi.Image, error) { - var versioned manifest.Versioned - if err := json.Unmarshal([]byte(rawManifest), &versioned); err != nil { - return nil, err - } - - _, desc, err := distribution.UnmarshalManifest(versioned.MediaType, []byte(rawManifest)) - if err != nil { - return nil, err - } - - annotations := make(map[string]string) - if managedByOpenShift { - annotations[imageapi.ManagedByOpenShiftAnnotation] = "true" - } - - img := &imageapi.Image{ - ObjectMeta: kapi.ObjectMeta{ - Name: desc.Digest.String(), - Annotations: annotations, - }, - DockerImageReference: fmt.Sprintf("localhost:5000/%s@%s", repoName, desc.Digest.String()), - DockerImageManifest: string(rawManifest), - } - - if err := imageapi.ImageWithMetadata(img); err != nil { - return nil, err - } - - return img, nil -} - -// UploadTestBlob generates a random tar file and uploads it to the given repository. -func UploadTestBlob(serverURL *url.URL, creds auth.CredentialStore, repoName string) (distribution.Descriptor, []byte, error) { - rs, ds, err := CreateRandomTarFile() - if err != nil { - return distribution.Descriptor{}, nil, fmt.Errorf("unexpected error generating test layer file: %v", err) - } - dgst := digest.Digest(ds) - +// UploadTestBlobFromReader uploads a testing blob read from the given reader to the registry located at the +// given URL. +func UploadTestBlobFromReader( + dgst digest.Digest, + reader io.ReadSeeker, + serverURL *url.URL, + creds auth.CredentialStore, + repoName string, +) (distribution.Descriptor, []byte, error) { ctx := context.Background() ref, err := reference.ParseNamed(repoName) if err != nil { @@ -100,7 +67,7 @@ func UploadTestBlob(serverURL *url.URL, creds auth.CredentialStore, repoName str if err != nil { return distribution.Descriptor{}, nil, err } - if _, err := io.Copy(wr, rs); err != nil { + if _, err := io.Copy(wr, reader); err != nil { return distribution.Descriptor{}, nil, fmt.Errorf("unexpected error copying to upload: %v", err) } desc, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}) @@ -108,10 +75,10 @@ func UploadTestBlob(serverURL *url.URL, creds auth.CredentialStore, repoName str return distribution.Descriptor{}, nil, err } - if _, err := rs.Seek(0, 0); err != nil { + if _, err := reader.Seek(0, 0); err != nil { return distribution.Descriptor{}, nil, fmt.Errorf("failed to seak blob reader: %v", err) } - content, err := ioutil.ReadAll(rs) + content, err := ioutil.ReadAll(reader) if err != nil { return distribution.Descriptor{}, nil, fmt.Errorf("failed to read blob content: %v", err) } @@ -119,6 +86,33 @@ func UploadTestBlob(serverURL *url.URL, creds auth.CredentialStore, repoName str return desc, content, nil } +// UploadPayloadAsBlob uploads a given payload to the registry serving at the given URL. +func UploadPayloadAsBlob( + payload []byte, + serverURL *url.URL, + creds auth.CredentialStore, + repoName string, +) (distribution.Descriptor, error) { + reader := bytes.NewReader(payload) + dgst := digest.FromBytes(payload) + desc, _, err := UploadTestBlobFromReader(dgst, reader, serverURL, creds, repoName) + return desc, err +} + +// UploadRandomTestBlob generates a random tar file and uploads it to the given repository. +func UploadRandomTestBlob( + serverURL *url.URL, + creds auth.CredentialStore, + repoName string, +) (distribution.Descriptor, []byte, error) { + rs, ds, err := CreateRandomTarFile() + if err != nil { + return distribution.Descriptor{}, nil, fmt.Errorf("unexpected error generating test layer file: %v", err) + } + dgst := digest.Digest(ds) + return UploadTestBlobFromReader(dgst, rs, serverURL, creds, repoName) +} + // createRandomTarFile creates a random tarfile, returning it as an io.ReadSeeker along with its digest. An // error is returned if there is a problem generating valid content. Inspired by // github.com/vendor/docker/distribution/testutil/tarfile.go. @@ -244,6 +238,28 @@ func GetFakeImageGetHandler(t *testing.T, imgs ...imageapi.Image) core.ReactionF } } +// GetFakeImageStreamGetHandler creates a test handler to be used as a reactor with core.Fake client +// that handles Get request on image stream resource. Matching is from given image stream list will be +// returned if found. Additionally, a shared image stream may be requested. +func GetFakeImageStreamGetHandler(t *testing.T, iss ...imageapi.ImageStream) core.ReactionFunc { + return func(action core.Action) (handled bool, ret runtime.Object, err error) { + switch a := action.(type) { + case core.GetAction: + for _, is := range iss { + if is.Namespace == a.GetNamespace() && a.GetName() == is.Name { + t.Logf("imagestream get handler: returning image stream %s/%s", is.Namespace, is.Name) + return true, &is, nil + } + } + + err := kerrors.NewNotFound(kapi.Resource("imageStreams"), a.GetName()) + t.Logf("imagestream get handler: %v", err) + return true, nil, err + } + return false, nil, nil + } +} + // TestNewImageStreamObject returns a new image stream object filled with given values. func TestNewImageStreamObject(namespace, name, tag, imageName, dockerImageReference string) *imageapi.ImageStream { return &imageapi.ImageStream{ diff --git a/pkg/image/api/helper.go b/pkg/image/api/helper.go index 7e33e59f07df..21c593ef1bea 100644 --- a/pkg/image/api/helper.go +++ b/pkg/image/api/helper.go @@ -455,9 +455,12 @@ func ImageWithMetadata(image *Image) error { case 2: image.DockerImageManifestMediaType = schema2.MediaTypeManifest + if len(image.DockerImageConfig) == 0 { + return fmt.Errorf("dockerImageConfig must not be empty for manifest schema 2") + } config := DockerImageConfig{} if err := json.Unmarshal([]byte(image.DockerImageConfig), &config); err != nil { - return err + return fmt.Errorf("failed to parse dockerImageConfig: %v", err) } image.DockerImageLayers = make([]ImageLayer, len(manifest.Layers)) diff --git a/test/integration/dockerregistry_pullthrough_test.go b/test/integration/dockerregistry_pullthrough_test.go index fc8811a371b0..d436328b156f 100644 --- a/test/integration/dockerregistry_pullthrough_test.go +++ b/test/integration/dockerregistry_pullthrough_test.go @@ -291,7 +291,7 @@ func TestPullThroughInsecure(t *testing.T) { t.Fatal(err) } - t.Logf("Run testPullThroughStatBlob (%s == true)...", imageapi.InsecureRepositoryAnnotation) + t.Logf("Run testPullThroughStatBlob (%s == true, spec.tags[%q].importPolicy.insecure == true)...", imageapi.InsecureRepositoryAnnotation, repotag) for digest := range descriptors { if err := testPullThroughStatBlob(&stream, user, token, digest); err != nil { t.Fatal(err) @@ -309,10 +309,31 @@ func TestPullThroughInsecure(t *testing.T) { t.Fatal(err) } - t.Logf("Run testPullThroughStatBlob (%s == false)...", imageapi.InsecureRepositoryAnnotation) + t.Logf("Run testPullThroughStatBlob (%s == false, spec.tags[%q].importPolicy.insecure == true)...", imageapi.InsecureRepositoryAnnotation, repotag) + for digest := range descriptors { + if err := testPullThroughStatBlob(&stream, user, token, digest); err != nil { + t.Fatal(err) + } + } + + istream, err = adminClient.ImageStreams(stream.Namespace).Get(stream.Name) + if err != nil { + t.Fatal(err) + } + tagRef := istream.Spec.Tags[repotag] + tagRef.ImportPolicy.Insecure = false + istream.Spec.Tags[repotag] = tagRef + _, err = adminClient.ImageStreams(istream.Namespace).Update(istream) + if err != nil { + t.Fatal(err) + } + + t.Logf("Run testPullThroughStatBlob (%s == false, spec.tags[%q].importPolicy.insecure == false)...", imageapi.InsecureRepositoryAnnotation, repotag) for digest := range descriptors { if err := testPullThroughStatBlob(&stream, user, token, digest); err == nil { t.Fatal("unexpexted access to insecure blobs") + } else { + t.Logf("%#+v", err) } } } diff --git a/test/integration/v2_docker_registry_test.go b/test/integration/v2_docker_registry_test.go index 9a468b748985..3f195a83a481 100644 --- a/test/integration/v2_docker_registry_test.go +++ b/test/integration/v2_docker_registry_test.go @@ -281,7 +281,7 @@ middleware: func putManifest(name, user, token string) (digest.Digest, error) { creds := registryutil.NewBasicCredentialStore(user, token) - desc, _, err := registryutil.UploadTestBlob(&url.URL{Host: "127.0.0.1:5000", Scheme: "http"}, creds, testutil.Namespace()+"/"+name) + desc, _, err := registryutil.UploadRandomTestBlob(&url.URL{Host: "127.0.0.1:5000", Scheme: "http"}, creds, testutil.Namespace()+"/"+name) if err != nil { return "", err }