Skip to content

Commit 8e24597

Browse files
committed
internal/mod/modmux: new package
This implements the actual registry multiplexing logic that will be used to implement split-horizon registries. Signed-off-by: Roger Peppe <[email protected]> Change-Id: Ibadb032334c9af04a1683825084ffcdd4bd8a3e2 Reviewed-on: https://review-eu.gerrithub.io/c/cue-lang/cue/+/1170883 TryBot-Result: CUEcueckoo <[email protected]> Unity-Result: CUE porcuepine <[email protected]> Reviewed-by: Daniel Martí <[email protected]>
1 parent eddccfc commit 8e24597

File tree

2 files changed

+383
-0
lines changed

2 files changed

+383
-0
lines changed

internal/mod/modmux/mux.go

+209
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Copyright 2023 CUE Labs AG
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package modmux
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"io"
21+
"sync"
22+
23+
"cuelabs.dev/go/oci/ociregistry"
24+
25+
"cuelang.org/go/internal/mod/modresolve"
26+
)
27+
28+
// New returns a registry implementation that uses the given
29+
// resolver to multiplex between different registries.
30+
//
31+
// The newRegistry function will be used to create the
32+
// registries for the hosts in the [modresolver.Location] values
33+
// returned by the resolver.
34+
//
35+
// The returned registry always returns an error for Repositories and MountBlob
36+
// (neither of these capabilities are required or used by the module fetching/pushing
37+
// logic).
38+
func New(resolver modresolve.Resolver, newRegistry func(host string, insecure bool) (ociregistry.Interface, error)) ociregistry.Interface {
39+
return &registry{
40+
resolver: resolver,
41+
newRegistry: newRegistry,
42+
repos: make(map[string]ociregistry.Interface),
43+
}
44+
}
45+
46+
type registry struct {
47+
*ociregistry.Funcs
48+
resolver modresolve.Resolver
49+
newRegistry func(host string, insecure bool) (ociregistry.Interface, error)
50+
51+
mu sync.Mutex
52+
repos map[string]ociregistry.Interface
53+
}
54+
55+
func (r *registry) GetBlob(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.BlobReader, error) {
56+
cr, repo, err := r.resolve(repo)
57+
if err != nil {
58+
return nil, err
59+
}
60+
return cr.GetBlob(ctx, repo, digest)
61+
}
62+
63+
func (r *registry) GetBlobRange(ctx context.Context, repo string, digest ociregistry.Digest, offset0, offset1 int64) (ociregistry.BlobReader, error) {
64+
cr, repo, err := r.resolve(repo)
65+
if err != nil {
66+
return nil, err
67+
}
68+
return cr.GetBlobRange(ctx, repo, digest, offset0, offset1)
69+
}
70+
71+
func (r *registry) GetManifest(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.BlobReader, error) {
72+
cr, repo, err := r.resolve(repo)
73+
if err != nil {
74+
return nil, err
75+
}
76+
return cr.GetManifest(ctx, repo, digest)
77+
}
78+
79+
func (r *registry) GetTag(ctx context.Context, repo string, tagName string) (ociregistry.BlobReader, error) {
80+
cr, repo, err := r.resolve(repo)
81+
if err != nil {
82+
return nil, err
83+
}
84+
return cr.GetTag(ctx, repo, tagName)
85+
}
86+
87+
func (r *registry) ResolveBlob(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) {
88+
cr, repo, err := r.resolve(repo)
89+
if err != nil {
90+
return ociregistry.Descriptor{}, err
91+
}
92+
return cr.ResolveBlob(ctx, repo, digest)
93+
}
94+
95+
func (r *registry) ResolveManifest(ctx context.Context, repo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) {
96+
cr, repo, err := r.resolve(repo)
97+
if err != nil {
98+
return ociregistry.Descriptor{}, err
99+
}
100+
return cr.ResolveManifest(ctx, repo, digest)
101+
}
102+
103+
func (r *registry) ResolveTag(ctx context.Context, repo string, tagName string) (ociregistry.Descriptor, error) {
104+
cr, repo, err := r.resolve(repo)
105+
if err != nil {
106+
return ociregistry.Descriptor{}, err
107+
}
108+
return cr.ResolveTag(ctx, repo, tagName)
109+
}
110+
111+
func (r *registry) PushBlob(ctx context.Context, repo string, desc ociregistry.Descriptor, rd io.Reader) (ociregistry.Descriptor, error) {
112+
cr, repo, err := r.resolve(repo)
113+
if err != nil {
114+
return ociregistry.Descriptor{}, err
115+
}
116+
return cr.PushBlob(ctx, repo, desc, rd)
117+
}
118+
119+
func (r *registry) PushBlobChunked(ctx context.Context, repo string, id string, chunkSize int) (ociregistry.BlobWriter, error) {
120+
cr, repo, err := r.resolve(repo)
121+
if err != nil {
122+
return nil, err
123+
}
124+
return cr.PushBlobChunked(ctx, repo, id, chunkSize)
125+
}
126+
127+
func (r *registry) MountBlob(ctx context.Context, fromRepo, toRepo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) {
128+
return ociregistry.Descriptor{}, ociregistry.ErrUnsupported
129+
}
130+
131+
func (r *registry) PushManifest(ctx context.Context, repo string, tag string, contents []byte, mediaType string) (ociregistry.Descriptor, error) {
132+
cr, repo, err := r.resolve(repo)
133+
if err != nil {
134+
return ociregistry.Descriptor{}, err
135+
}
136+
return cr.PushManifest(ctx, repo, tag, contents, mediaType)
137+
}
138+
139+
func (r *registry) DeleteBlob(ctx context.Context, repo string, digest ociregistry.Digest) error {
140+
cr, repo, err := r.resolve(repo)
141+
if err != nil {
142+
return err
143+
}
144+
return cr.DeleteBlob(ctx, repo, digest)
145+
}
146+
147+
func (r *registry) DeleteManifest(ctx context.Context, repo string, digest ociregistry.Digest) error {
148+
cr, repo, err := r.resolve(repo)
149+
if err != nil {
150+
return err
151+
}
152+
return cr.DeleteManifest(ctx, repo, digest)
153+
}
154+
155+
func (r *registry) DeleteTag(ctx context.Context, repo string, name string) error {
156+
cr, repo, err := r.resolve(repo)
157+
if err != nil {
158+
return err
159+
}
160+
return cr.DeleteTag(ctx, repo, name)
161+
}
162+
163+
func (r *registry) Repositories(ctx context.Context) ociregistry.Iter[string] {
164+
return ociregistry.ErrorIter[string](ociregistry.ErrUnsupported)
165+
}
166+
167+
func (r *registry) Tags(ctx context.Context, repo string) ociregistry.Iter[string] {
168+
cr, repo, err := r.resolve(repo)
169+
if err != nil {
170+
return ociregistry.ErrorIter[string](err)
171+
}
172+
return cr.Tags(ctx, repo)
173+
}
174+
175+
func (r *registry) Referrers(ctx context.Context, repo string, digest ociregistry.Digest, artifactType string) ociregistry.Iter[ociregistry.Descriptor] {
176+
cr, repo, err := r.resolve(repo)
177+
if err != nil {
178+
return ociregistry.ErrorIter[ociregistry.Descriptor](err)
179+
}
180+
return cr.Referrers(ctx, repo, digest, artifactType)
181+
}
182+
183+
func (r *registry) resolve(repo string) (reg ociregistry.Interface, repo1 string, err error) {
184+
loc := r.resolver.Resolve(repo)
185+
r.mu.Lock()
186+
defer r.mu.Unlock()
187+
reg = r.repos[loc.Host]
188+
if reg == nil {
189+
reg1, err := r.newRegistry(loc.Host, loc.Insecure)
190+
if err != nil {
191+
return nil, "", fmt.Errorf("cannot make client: %v", err)
192+
}
193+
r.repos[loc.Host] = reg1
194+
reg = reg1
195+
}
196+
return reg, join(loc.Prefix, repo), nil
197+
}
198+
199+
// join is similar to path.Join but doesn't Clean the result, because
200+
// that's not appropriate in this scenario.
201+
func join(prefix, repo string) string {
202+
if prefix == "" {
203+
return repo
204+
}
205+
if repo == "" {
206+
return prefix
207+
}
208+
return prefix + "/" + repo
209+
}

internal/mod/modmux/mux_test.go

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package modmux
2+
3+
import (
4+
"archive/zip"
5+
"bytes"
6+
"context"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"io/fs"
11+
"io/ioutil"
12+
"testing"
13+
14+
"cuelabs.dev/go/oci/ociregistry"
15+
"cuelabs.dev/go/oci/ociregistry/ociclient"
16+
"cuelabs.dev/go/oci/ociregistry/ocifilter"
17+
"cuelabs.dev/go/oci/ociregistry/ocimem"
18+
"github.com/go-quicktest/qt"
19+
"golang.org/x/tools/txtar"
20+
21+
"cuelang.org/go/internal/mod/modregistry"
22+
"cuelang.org/go/internal/mod/modresolve"
23+
"cuelang.org/go/internal/mod/module"
24+
modzip "cuelang.org/go/internal/mod/zip"
25+
"cuelang.org/go/internal/registrytest"
26+
)
27+
28+
const contents = `
29+
-- r0/example.com_v0.0.1/cue.mod/module.cue --
30+
module: "example.com@v0"
31+
32+
-- r0/example.com_v0.0.1/x.cue --
33+
package x
34+
"r0/example.com_v0.0.1"
35+
36+
-- r0/example.com_v0.0.2/cue.mod/module.cue --
37+
module: "example.com@v0"
38+
39+
-- r0/example.com_v0.0.2/x.cue --
40+
package x
41+
"r0/example.com_v0.0.2"
42+
43+
-- r0/example.com_foo_v0.0.1/cue.mod/module.cue --
44+
module: "example.com/foo@v0"
45+
46+
-- r0/example.com_foo_v0.0.1/x.cue --
47+
package x
48+
"r0/example.com_foo_v0.0.1"
49+
50+
-- r1/example.com_foo_v0.0.1/cue.mod/module.cue --
51+
module: "example.com/foo@v0"
52+
53+
-- r1/example.com_foo_v0.0.1/x.cue --
54+
package x
55+
"r1/example.com_foo_v0.0.1"
56+
`
57+
58+
func TestMux(t *testing.T) {
59+
rfs := registrytest.TxtarFS(txtar.Parse([]byte(contents)))
60+
const numRegistries = 2
61+
registries := make([]*registrytest.Registry, numRegistries)
62+
for i := 0; i < numRegistries; i++ {
63+
rfs1, _ := fs.Sub(rfs, fmt.Sprintf("r%d", i))
64+
// TODO non-empty prefixes.
65+
r, err := registrytest.New(rfs1, "")
66+
qt.Assert(t, qt.IsNil(err), qt.Commentf("r%d", i))
67+
registries[i] = r
68+
defer r.Close()
69+
}
70+
resolver, err := modresolve.ParseCUERegistry(fmt.Sprintf("example.com=%s,example.com/foo=%s", registries[0].Host(), registries[1].Host()), "fallback.registry/subdir")
71+
qt.Assert(t, qt.IsNil(err))
72+
73+
fallback := ocimem.New()
74+
75+
muxr := New(resolver, func(host string, insecure bool) (ociregistry.Interface, error) {
76+
if host == "fallback.registry" {
77+
return fallback, nil
78+
}
79+
return ociclient.New(host, &ociclient.Options{
80+
Insecure: insecure,
81+
})
82+
})
83+
ctx := context.Background()
84+
modc := modregistry.NewClient(muxr)
85+
86+
qt.Assert(t, qt.StringContains(fetchXCUE(t, modc, "example.com", "v0.0.1"), `"r0/example.com_v0.0.1"`))
87+
qt.Assert(t, qt.StringContains(fetchXCUE(t, modc, "example.com/foo", "v0.0.1"), `"r1/example.com_foo_v0.0.1"`))
88+
89+
versions, err := modc.ModuleVersions(ctx, "example.com@v0")
90+
qt.Assert(t, qt.DeepEquals(versions, []string{"v0.0.1", "v0.0.2"}))
91+
versions, err = modc.ModuleVersions(ctx, "example.com/foo@v0")
92+
qt.Assert(t, qt.DeepEquals(versions, []string{"v0.0.1"}))
93+
94+
// Verify that we can put a module too.
95+
var zbuf bytes.Buffer
96+
zw := zip.NewWriter(&zbuf)
97+
zipAddFS(zw, registrytest.TxtarFS(txtar.Parse([]byte(`
98+
-- cue.mod/module.cue --
99+
module: "other.com/a/b@v1"
100+
-- x.cue --
101+
package x
102+
"other.com/a/[email protected]"
103+
`))))
104+
zw.Close()
105+
err = modc.PutModule(ctx, module.MustNewVersion("other.com/a/b", "v1.2.3"), bytes.NewReader(zbuf.Bytes()), int64(zbuf.Len()))
106+
qt.Assert(t, qt.IsNil(err))
107+
108+
qt.Assert(t, qt.StringContains(fetchXCUE(t, modc, "other.com/a/b", "v1.2.3"), `"other.com/a/[email protected]"`))
109+
110+
// Check that the module we've just put ended up in the correct place.
111+
modc1 := modregistry.NewClient(ocifilter.Sub(fallback, "subdir"))
112+
113+
qt.Assert(t, qt.StringContains(fetchXCUE(t, modc1, "other.com/a/b", "v1.2.3"), `"other.com/a/[email protected]"`))
114+
}
115+
116+
// fetchXCUE returns the contents of the x.cue file inside the
117+
// module with the given path and version.
118+
func fetchXCUE(t *testing.T, mclient *modregistry.Client, mpath string, vers string) string {
119+
ctx := context.Background()
120+
121+
mv := module.MustNewVersion(mpath, vers)
122+
m, err := mclient.GetModule(ctx, mv)
123+
qt.Assert(t, qt.IsNil(err))
124+
mzipr, err := m.GetZip(ctx)
125+
qt.Assert(t, qt.IsNil(err))
126+
data, err := io.ReadAll(mzipr)
127+
qt.Assert(t, qt.IsNil(err))
128+
zipr, _, _, err := modzip.CheckZip(mv, bytes.NewReader(data), int64(len(data)))
129+
qt.Assert(t, qt.IsNil(err))
130+
f, err := zipr.Open("x.cue")
131+
qt.Assert(t, qt.IsNil(err))
132+
fdata, err := ioutil.ReadAll(f)
133+
qt.Assert(t, qt.IsNil(err))
134+
return string(fdata)
135+
}
136+
137+
// zipAddFS adds all the files from fsys to zw.
138+
// It's copied from zip.Writer.AddFS.
139+
// TODO remove this when we can use go1.22's implementation
140+
// directly.
141+
func zipAddFS(w *zip.Writer, fsys fs.FS) error {
142+
return fs.WalkDir(fsys, ".", func(name string, d fs.DirEntry, err error) error {
143+
if err != nil {
144+
return err
145+
}
146+
if d.IsDir() {
147+
return nil
148+
}
149+
info, err := d.Info()
150+
if err != nil {
151+
return err
152+
}
153+
if !info.Mode().IsRegular() {
154+
return errors.New("zip: cannot add non-regular file")
155+
}
156+
h, err := zip.FileInfoHeader(info)
157+
if err != nil {
158+
return err
159+
}
160+
h.Name = name
161+
h.Method = zip.Deflate
162+
fw, err := w.CreateHeader(h)
163+
if err != nil {
164+
return err
165+
}
166+
f, err := fsys.Open(name)
167+
if err != nil {
168+
return err
169+
}
170+
defer f.Close()
171+
_, err = io.Copy(fw, f)
172+
return err
173+
})
174+
}

0 commit comments

Comments
 (0)