Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi progress bar to uploads #2134

Merged
merged 4 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
github.com/vbauerster/mpb/v8 v8.9.1
github.com/vincent-petithory/dataurl v1.0.0
github.com/xeipuuv/gojsonschema v1.2.0
github.com/xeonx/timeago v1.0.0-rc5
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0
golang.org/x/sys v0.29.0
golang.org/x/term v0.27.0
golang.org/x/tools v0.28.0
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -51,6 +52,8 @@ require (
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0 // indirect
github.com/Masterminds/semver/v3 v3.3.0 // indirect
github.com/OpenPeeDeeP/depguard/v2 v2.2.0 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/alecthomas/go-check-sumtype v0.2.0 // indirect
github.com/alexkohler/nakedret/v2 v2.0.5 // indirect
github.com/alexkohler/prealloc v1.0.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+
github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/OpenPeeDeeP/depguard/v2 v2.2.0 h1:vDfG60vDtIuf0MEOhmLlLLSzqaRM8EMcgJPdp74zmpA=
github.com/OpenPeeDeeP/depguard/v2 v2.2.0/go.mod h1:CIzddKRvLBC4Au5aYP/i3nyaWQ+ClszLIuVocRiCYFQ=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alecthomas/assert/v2 v2.2.2 h1:Z/iVC0xZfWTaFNE6bA3z07T86hd45Xe2eLt6WVy2bbk=
github.com/alecthomas/assert/v2 v2.2.2/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ=
github.com/alecthomas/go-check-sumtype v0.2.0 h1:Bo+e4DFf3rs7ME9w/0SU/g6nmzJaphduP8Cjiz0gbwY=
Expand Down Expand Up @@ -664,6 +668,8 @@ github.com/uudashr/iface v1.2.1 h1:vHHyzAUmWZ64Olq6NZT3vg/z1Ws56kyPdBOd5kTXDF8=
github.com/uudashr/iface v1.2.1/go.mod h1:4QvspiRd3JLPAEXBQ9AiZpLbJlrWWgRChOKDJEuQTdg=
github.com/vbatts/tar-split v0.11.3 h1:hLFqsOLQ1SsppQNTMpkpPXClLDfC2A3Zgy9OUU+RVck=
github.com/vbatts/tar-split v0.11.3/go.mod h1:9QlHN18E+fEH7RdG+QAJJcuya3rqT7eXSTY7wGrAokY=
github.com/vbauerster/mpb/v8 v8.9.1 h1:LH5R3lXPfE2e3lIGxN7WNWv3Hl5nWO6LRi2B0L0ERHw=
github.com/vbauerster/mpb/v8 v8.9.1/go.mod h1:4XMvznPh8nfe2NpnDo1QTPvW9MVkUhbG90mPWvmOzcQ=
github.com/vincent-petithory/dataurl v1.0.0 h1:cXw+kPto8NLuJtlMsI152irrVw9fRDX8AbShPRpg2CI=
github.com/vincent-petithory/dataurl v1.0.0/go.mod h1:FHafX5vmDzyP+1CQATJn7WFKc9CvnvxyvZy6I1MrG/U=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
Expand Down Expand Up @@ -902,8 +908,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
62 changes: 49 additions & 13 deletions pkg/docker/fast_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"

"github.com/replicate/cog/pkg/global"
"github.com/replicate/cog/pkg/requirements"
Expand Down Expand Up @@ -55,6 +57,9 @@ func userAgent() string {

func FastPush(ctx context.Context, image string, projectDir string, command Command) error {
g, _ := errgroup.WithContext(ctx)
p := mpb.New(
mpb.WithRefreshRate(180 * time.Millisecond),
)

token, err := command.LoadLoginToken(global.ReplicateRegistryHost)
if err != nil {
Expand All @@ -69,7 +74,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
// Upload weights
for _, weight := range weights {
g.Go(func() error {
return uploadFile(ctx, weightsObjectType, weight.Digest, weight.Path, token)
return uploadFile(ctx, weightsObjectType, weight.Digest, weight.Path, token, p, "weights - "+filepath.Base(weight.Path))
})
}

Expand All @@ -84,7 +89,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
return err
}
g.Go(func() error {
return uploadFile(ctx, filesObjectType, hash, aptTarFile, token)
return uploadFile(ctx, filesObjectType, hash, aptTarFile, token, p, "apt")
})
}

Expand All @@ -104,7 +109,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
return err
}
g.Go(func() error {
return uploadFile(ctx, filesObjectType, hash, pythonTar, token)
return uploadFile(ctx, filesObjectType, hash, pythonTar, token, p, "python-packages")
})
} else {
requirementsTarFile := filepath.Join(tmpDir, requirementsTarFile)
Expand All @@ -127,7 +132,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
return err
}
g.Go(func() error {
return uploadFile(ctx, filesObjectType, hash, srcTar, token)
return uploadFile(ctx, filesObjectType, hash, srcTar, token, p, "src")
})

// Wait for uploads
Expand Down Expand Up @@ -187,9 +192,45 @@ func checkVerificationStatus(req *http.Request, client *http.Client) (bool, erro
return false, nil
}

func uploadFile(ctx context.Context, objectType string, digest string, path string, token string) error {
func uploadFile(ctx context.Context, objectType string, digest string, path string, token string, p *mpb.Progress, desc string) error {
console.Debug("uploading file: " + path)

// Open the file for uploading
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()

// Find the file size
fileInfo, err := file.Stat()
if err != nil {
return err
}

// Start the progress bar
trimDesc := desc
if len(trimDesc) > 20 {
trimDesc = trimDesc[:20]
}
if len(trimDesc) < 20 {
trimDesc += strings.Repeat(" ", 20-len(trimDesc))
}
bar := p.New(fileInfo.Size(),
mpb.BarStyle().Rbound("|"),
mpb.PrependDecorators(
decor.Name(trimDesc+" "),
decor.Counters(decor.SizeB1024(0), "% .2f / % .2f"),
),
mpb.AppendDecorators(
decor.EwmaETA(decor.ET_STYLE_GO, 30),
decor.Name(" ] "),
decor.EwmaSpeed(decor.SizeB1024(0), "% .2f", 30),
),
)
defer bar.Abort(false)

// Declare that we want to upload a file.
uploadUrl := startUploadURL(objectType, digest)
client := &http.Client{}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadUrl.String(), nil)
Expand Down Expand Up @@ -219,13 +260,6 @@ func uploadFile(ctx context.Context, objectType string, digest string, path stri
return err
}

// Open the file for uploading
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()

// Upload the file using an S3 client
console.Debug("multi-part uploading file: " + path)
cfg := aws.NewConfig()
Expand All @@ -244,10 +278,12 @@ func uploadFile(ctx context.Context, objectType string, digest string, path stri
u.PartSize = 64 * 1024 * 1024 // 64MB per part
})

proxyReader := bar.ProxyReader(file)
defer proxyReader.Close()
uploadParams := &s3.PutObjectInput{
Bucket: aws.String(data.Bucket),
Key: aws.String(data.Key),
Body: file,
Body: proxyReader,
}
_, err = uploader.Upload(ctx, uploadParams)
if err != nil {
Expand Down