Skip to content

Commit

Permalink
mutate: allow for custom compression
Browse files Browse the repository at this point in the history
at least via the API for now :)

Signed-off-by: Tycho Andersen <[email protected]>
  • Loading branch information
tych0 committed Sep 29, 2020
1 parent 1f22d88 commit 5d0bf68
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/umoci/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func insert(ctx *cli.Context) error {

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayerGzip, reader, history); err != nil {
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, reader, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/umoci/raw-add-layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func rawAddLayer(ctx *cli.Context) error {

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayerGzip, newLayer, history); err != nil {
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, newLayer, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}

Expand Down
69 changes: 69 additions & 0 deletions mutate/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package mutate

import (
"io"
"io/ioutil"
"runtime"

gzip "github.com/klauspost/pgzip"
"github.com/pkg/errors"
)

// Compressor is an interface which users can use to implement different
// compression types.
type Compressor interface {
// Compress sets up the streaming compressor for this compression type.
Compress(io.Reader) (io.ReadCloser, error)

// MediaTypeSuffix returns the suffix to be added to the layer to
// indicate what compression type is used, e.g. "gzip", or "" for no
// compression.
MediaTypeSuffix() string
}

type noopCompressor struct{}

func (nc noopCompressor) Compress(r io.Reader) (io.ReadCloser, error) {
return ioutil.NopCloser(r), nil
}

func (nc noopCompressor) MediaTypeSuffix() string {
return ""
}

// NoopCompressor provides no compression.
var NoopCompressor Compressor = noopCompressor{}

// GzipCompressor provides gzip compression.
var GzipCompressor Compressor = gzipCompressor{}

type gzipCompressor struct{}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
if _, err := io.Copy(gzw, reader); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
}
if err := gzw.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close gzip writer"))
}
if err := pipeWriter.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close pipe writer"))
}
}()

return pipeReader, nil
}

func (gz gzipCompressor) MediaTypeSuffix() string {
return "gzip"
}
47 changes: 47 additions & 0 deletions mutate/compress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mutate

import (
"bytes"
"io/ioutil"
"testing"

gzip "github.com/klauspost/pgzip"
"github.com/stretchr/testify/assert"
)

const (
fact = "meshuggah rocks!!!"
)

func TestNoopCompressor(t *testing.T) {
assert := assert.New(t)
buf := bytes.NewBufferString(fact)

r, err := NoopCompressor.Compress(buf)
assert.NoError(err)
assert.Equal(NoopCompressor.MediaTypeSuffix(), "")

content, err := ioutil.ReadAll(r)
assert.NoError(err)

assert.Equal(string(content), fact)
}

func TestGzipCompressor(t *testing.T) {
assert := assert.New(t)

buf := bytes.NewBufferString(fact)
c := GzipCompressor

r, err := c.Compress(buf)
assert.NoError(err)
assert.Equal(c.MediaTypeSuffix(), "gzip")

r, err = gzip.NewReader(r)
assert.NoError(err)

content, err := ioutil.ReadAll(r)
assert.NoError(err)

assert.Equal(string(content), fact)
}
37 changes: 9 additions & 28 deletions mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ package mutate
import (
"io"
"reflect"
"runtime"
"time"

"github.com/apex/log"
gzip "github.com/klauspost/pgzip"
"github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/umoci/oci/cas"
Expand Down Expand Up @@ -232,38 +230,21 @@ func (m *Mutator) Set(ctx context.Context, config ispec.ImageConfig, meta Meta,
// add adds the given layer to the CAS, and mutates the configuration to
// include the diffID. The returned string is the digest of the *compressed*
// layer (which is compressed by us).
func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.History) (digest.Digest, int64, error) {
func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.History, compressor Compressor) (digest.Digest, int64, error) {
if err := m.cache(ctx); err != nil {
return "", -1, errors.Wrap(err, "getting cache failed")
}

diffidDigester := cas.BlobAlgorithm.Digester()
hashReader := io.TeeReader(reader, diffidDigester.Hash())

pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

gzw := gzip.NewWriter(pipeWriter)
defer gzw.Close()
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return "", -1, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
compressed, err := compressor.Compress(hashReader)
if err != nil {
return "", -1, errors.Wrapf(err, "couldn't create compression for blob")
}
go func() {
if _, err := io.Copy(gzw, hashReader); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
}
if err := gzw.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close gzip writer"))
}
if err := pipeWriter.Close(); err != nil {
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "close pipe writer"))
}
}()
defer compressed.Close()

layerDigest, layerSize, err := m.engine.PutBlob(ctx, pipeReader)
layerDigest, layerSize, err := m.engine.PutBlob(ctx, compressed)
if err != nil {
return "", -1, errors.Wrap(err, "put layer blob")
}
Expand Down Expand Up @@ -291,20 +272,20 @@ func (m *Mutator) add(ctx context.Context, reader io.Reader, history *ispec.Hist
// generate the DiffIDs for the image metatadata. The provided history entry is
// appended to the image's history and should correspond to what operations
// were made to the configuration.
func (m *Mutator) Add(ctx context.Context, mediaType string, r io.Reader, history *ispec.History) (ispec.Descriptor, error) {
func (m *Mutator) Add(ctx context.Context, mediaType string, r io.Reader, history *ispec.History, compressor Compressor) (ispec.Descriptor, error) {
desc := ispec.Descriptor{}
if err := m.cache(ctx); err != nil {
return desc, errors.Wrap(err, "getting cache failed")
}

digest, size, err := m.add(ctx, r, history)
digest, size, err := m.add(ctx, r, history, compressor)
if err != nil {
return desc, errors.Wrap(err, "add layer")
}

// Append to layers.
desc = ispec.Descriptor{
MediaType: mediaType,
MediaType: mediaType + "+" + compressor.MediaTypeSuffix(),
Digest: digest,
Size: size,
}
Expand Down
4 changes: 2 additions & 2 deletions mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ func TestMutateAdd(t *testing.T) {
buffer := bytes.NewBufferString("contents")

// Add a new layer.
newLayerDesc, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayerGzip, buffer, &ispec.History{
newLayerDesc, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, buffer, &ispec.History{
Comment: "new layer",
})
}, GzipCompressor)
if err != nil {
t.Fatalf("unexpected error adding layer: %+v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion repack.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Repack(engineExt casext.Engine, tagName string, bundlePath string, meta Met

// TODO: We should add a flag to allow for a new layer to be made
// non-distributable.
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayerGzip, reader, history); err != nil {
if _, err := mutator.Add(context.Background(), ispec.MediaTypeImageLayer, reader, history, mutate.GzipCompressor); err != nil {
return errors.Wrap(err, "add diff layer")
}
}
Expand Down

0 comments on commit 5d0bf68

Please sign in to comment.