-
Notifications
You must be signed in to change notification settings - Fork 555
/
Copy pathwrite.go
293 lines (254 loc) · 7.91 KB
/
write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
// Copyright 2018 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"bytes"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"github.com/google/go-containerregistry/authn"
"github.com/google/go-containerregistry/name"
"github.com/google/go-containerregistry/v1"
"github.com/google/go-containerregistry/v1/remote/transport"
)
// WriteOptions are used to expose optional information to guide or
// control the image write.
type WriteOptions struct {
// The set of paths from which to attempt to mount blobs.
MountPaths []name.Repository
// TODO(mattmoor): Expose "threads" to limit parallelism?
}
// Write pushes the provided img to the specified image reference.
func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.RoundTripper,
wo WriteOptions) error {
scopes := []string{ref.Scope(transport.PushScope)}
for _, mp := range wo.MountPaths {
scopes = append(scopes, mp.Scope(transport.PullScope))
}
tr, err := transport.New(ref.Context().Registry, auth, t, scopes)
if err != nil {
return err
}
w := writer{
ref: ref,
client: &http.Client{Transport: tr},
img: img,
options: wo,
}
bs, err := img.BlobSet()
if err != nil {
return err
}
// Spin up go routines to publish each of the members of BlobSet(),
// and use an error channel to collect their results.
errCh := make(chan error)
defer close(errCh)
for h := range bs {
go func(h v1.Hash) {
errCh <- w.uploadOne(h)
}(h)
}
// Now wait for all of the blob uploads to complete.
var errors []error
for _ = range bs {
if err := <-errCh; err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
// Return the first error we encountered.
return errors[0]
}
// With all of the constituent elements uploaded, upload the manifest
// to commit the image.
return w.commitImage()
}
// writer writes the elements of an image to a remote image reference.
type writer struct {
ref name.Reference
client *http.Client
img v1.Image
options WriteOptions
}
// url returns a url.Url for the specified path in the context of this remote image reference.
func (w *writer) url(path string) url.URL {
return url.URL{
Scheme: transport.Scheme(w.ref.Context().Registry),
Host: w.ref.Context().RegistryStr(),
Path: path,
}
}
// nextLocation extracts the fully-qualified URL to which we should send the next request in an upload sequence.
func (w *writer) nextLocation(resp *http.Response) (string, error) {
loc := resp.Header.Get("Location")
if len(loc) == 0 {
return "", errors.New("missing Location header")
}
u, err := url.Parse(loc)
if err != nil {
return "", err
}
// If the location header returned is just a url path, then fully qualify it.
// We cannot simply call w.url, since there might be an embedded query string.
return resp.Request.URL.ResolveReference(u).String(), nil
}
// initiateUpload initiates the blob upload, which starts with a POST that can
// optionally include the hash of the layer and a list of repositories from
// which that layer might be read. On failure, an error is returned.
// On success, the layer was either mounted (nothing more to do) or a blob
// upload was initiated and the body of that blob should be sent to the returned
// location.
func (w *writer) initiateUpload(h v1.Hash) (location string, mounted bool, err error) {
u := w.url(fmt.Sprintf("/v2/%s/blobs/uploads/", w.ref.Context().RepositoryStr()))
uv := url.Values{
"mount": []string{h.String()},
}
var from []string
for _, m := range w.options.MountPaths {
from = append(from, m.RepositoryStr())
}
// We currently avoid HEAD because it's semi-redundant with the mount that is part
// of initiating the blob upload. GCR will perform an existence check on the initiation
// if "mount" is specified, even if no "from" sources are specified. If this turns out
// to not be broadly applicable then we should replace mounts without "from"s with a HEAD.
if len(from) > 0 {
uv["from"] = from
}
u.RawQuery = uv.Encode()
// Make the request to initiate the blob upload.
resp, err := w.client.Post(u.String(), "application/json", nil)
if err != nil {
return "", false, err
}
defer resp.Body.Close()
if err := checkError(resp, http.StatusCreated, http.StatusAccepted); err != nil {
return "", false, err
}
// Check the response code to determine the result.
switch resp.StatusCode {
case http.StatusCreated:
// We're done, we were able to fast-path.
return "", true, nil
case http.StatusAccepted:
// Proceed to PATCH, upload has begun.
loc, err := w.nextLocation(resp)
return loc, false, err
default:
panic("Unreachable: initiateUpload")
}
}
// streamBlob streams the contents of the blob to the specified location.
// On failure, this will return an error. On success, this will return the location
// header indicating how to commit the streamed blob.
func (w *writer) streamBlob(h v1.Hash, streamLocation string) (commitLocation string, err error) {
l, err := w.img.LayerByDigest(h)
if err != nil {
return "", err
}
blob, err := l.Compressed()
if err != nil {
return "", err
}
defer blob.Close()
req, err := http.NewRequest(http.MethodPatch, streamLocation, blob)
if err != nil {
return "", err
}
resp, err := w.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if err := checkError(resp, http.StatusNoContent, http.StatusAccepted, http.StatusCreated); err != nil {
return "", err
}
// The blob has been uploaded, return the location header indicating
// how to commit this layer.
return w.nextLocation(resp)
}
// commitBlob commits this blob by sending a PUT to the location returned from streaming the blob.
func (w *writer) commitBlob(h v1.Hash, location string) (err error) {
u, err := url.Parse(location)
if err != nil {
return err
}
v := u.Query()
v.Set("digest", h.String())
u.RawQuery = v.Encode()
req, err := http.NewRequest(http.MethodPut, u.String(), nil)
if err != nil {
return err
}
resp, err := w.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return checkError(resp, http.StatusCreated)
}
// uploadOne performs a complete upload of a single layer.
func (w *writer) uploadOne(h v1.Hash) error {
location, mounted, err := w.initiateUpload(h)
if err != nil {
return err
} else if mounted {
log.Printf("mounted blob: %v", h)
return nil
}
location, err = w.streamBlob(h, location)
if err != nil {
return err
}
if err := w.commitBlob(h, location); err != nil {
return err
}
log.Printf("pushed blob %v", h)
return nil
}
// commitImage does a PUT of the image's manifest.
func (w *writer) commitImage() error {
raw, err := w.img.RawManifest()
if err != nil {
return err
}
mt, err := w.img.MediaType()
if err != nil {
return err
}
u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.ref.Context().RepositoryStr(), w.ref.Identifier()))
// Make the request to PUT the serialized manifest
req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw))
if err != nil {
return err
}
req.Header.Set("Content-Type", string(mt))
resp, err := w.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := checkError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
return err
}
digest, err := w.img.Digest()
if err != nil {
return err
}
// The image was successfully pushed!
fmt.Printf("%v: digest: %v size: %d\n", w.ref, digest, len(raw))
return nil
}
// TODO(mattmoor): WriteIndex