Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor Fetcher interface used for downloading migrations #8728

Merged
merged 7 commits into from
Feb 11, 2022
7 changes: 3 additions & 4 deletions repo/fsrepo/migrations/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,14 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s
}
defer arcFile.Close()

// Open connection to download archive from ipfs path
rc, err := fetcher.Fetch(ctx, arcDistPath)
// Open connection to download archive from ipfs path and write to file
arcBytes, err := fetcher.Fetch(ctx, arcDistPath)
if err != nil {
return "", err
}
defer rc.Close()

// Write download data
_, err = io.Copy(arcFile, rc)
_, err = io.Copy(arcFile, bytes.NewReader(arcBytes))
if err != nil {
return "", err
}
Expand Down
14 changes: 4 additions & 10 deletions repo/fsrepo/migrations/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package migrations

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -96,14 +96,13 @@ func TestHttpFetch(t *testing.T) {

fetcher := NewHttpFetcher("", ts.URL, "", 0)

rc, err := fetcher.Fetch(ctx, "/versions")
out, err := fetcher.Fetch(ctx, "/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(out))
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand Down Expand Up @@ -232,16 +231,11 @@ func TestMultiFetcher(t *testing.T) {

mf := NewMultiFetcher(badFetcher, fetcher)

rc, err := mf.Fetch(ctx, "/versions")
vers, err := mf.Fetch(ctx, "/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()

vers, err := ioutil.ReadAll(rc)
if err != nil {
t.Fatal("could not read versions:", err)
}
if len(vers) < 45 {
fmt.Println("unexpected more data")
}
Expand Down
10 changes: 4 additions & 6 deletions repo/fsrepo/migrations/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ const (

type Fetcher interface {
// Fetch attempts to fetch the file at the given ipfs path.
// Returns io.ReadCloser on success, which caller must close.
Fetch(ctx context.Context, filePath string) (io.ReadCloser, error)
Fetch(ctx context.Context, filePath string) ([]byte, error)
// Close performs any cleanup after the fetcher is not longer needed.
Close() error
}
Expand All @@ -48,13 +47,12 @@ func NewMultiFetcher(f ...Fetcher) Fetcher {
}

// Fetch attempts to fetch the file at each of its fetchers until one succeeds.
// Returns io.ReadCloser on success, which caller must close.
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) ([]byte, error) {
var errs error
for _, fetcher := range f.fetchers {
rc, err := fetcher.Fetch(ctx, ipfsPath)
out, err := fetcher.Fetch(ctx, ipfsPath)
if err == nil {
return rc, nil
return out, nil
}
errs = multierror.Append(errs, err)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
14 changes: 9 additions & 5 deletions repo/fsrepo/migrations/httpfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http
}

// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
// site configured for this HttpFetcher.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
gwURL := f.gateway + path.Join(f.distPath, filePath)
fmt.Printf("Fetching with HTTP: %q\n", gwURL)

Expand All @@ -89,10 +88,15 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
}

var rc io.ReadCloser
if f.limit != 0 {
return NewLimitReadCloser(resp.Body, f.limit), nil
rc = NewLimitReadCloser(resp.Body, f.limit)
} else {
rc = resp.Body
}
return resp.Body, nil
defer rc.Close()

return ioutil.ReadAll(rc)
}

func (f *HttpFetcher) Close() error {
Expand Down
16 changes: 11 additions & 5 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type IpfsFetcher struct {
addrInfo peer.AddrInfo
}

var _ migrations.Fetcher = (*IpfsFetcher)(nil)

// NewIpfsFetcher creates a new IpfsFetcher
//
// Specifying "" for distPath sets the default IPNS path.
Expand Down Expand Up @@ -85,9 +87,8 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe
}

// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
// site configured for this HttpFetcher.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
// Initialize and start IPFS node on first call to Fetch, since the fetcher
// may be created by not used.
f.openOnce.Do(func() {
Expand Down Expand Up @@ -123,10 +124,15 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
return nil, fmt.Errorf("%q is not a file", filePath)
}

var rc io.ReadCloser
if f.limit != 0 {
return migrations.NewLimitReadCloser(fileNode, f.limit), nil
rc = migrations.NewLimitReadCloser(fileNode, f.limit)
} else {
rc = fileNode
}
return fileNode, nil
defer rc.Close()

return ioutil.ReadAll(rc)
}

func (f *IpfsFetcher) Close() error {
Expand Down
9 changes: 4 additions & 5 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipfsfetcher

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
Expand All @@ -28,14 +29,13 @@ func TestIpfsFetcher(t *testing.T) {
fetcher := NewIpfsFetcher("", 0, nil)
defer fetcher.Close()

rc, err := fetcher.Fetch(ctx, "go-ipfs/versions")
out, err := fetcher.Fetch(ctx, "go-ipfs/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(out))
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand All @@ -52,8 +52,7 @@ func TestIpfsFetcher(t *testing.T) {
}

// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
if err == nil {
if _, err = fetcher.Fetch(ctx, "/no_such_file"); err == nil {
t.Fatal("expected error 404")
}

Expand Down
5 changes: 3 additions & 2 deletions repo/fsrepo/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) {
// downloadSources,
func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) {
const httpUserAgent = "go-ipfs"
const numTriesPerHTTP = 3

var fetchers []Fetcher
for _, src := range downloadSources {
src := strings.TrimSpace(src)
switch src {
case "HTTPS", "https", "HTTP", "http":
fetchers = append(fetchers, NewHttpFetcher(distPath, "", httpUserAgent, 0))
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP})
case "IPFS", "ipfs":
if newIpfsFetcher != nil {
fetchers = append(fetchers, newIpfsFetcher(distPath))
Expand All @@ -178,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch
default:
return nil, errors.New("bad gateway address: url scheme must be http or https")
}
fetchers = append(fetchers, NewHttpFetcher(distPath, u.String(), httpUserAgent, 0))
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP})
case "":
// Ignore empty string
}
Expand Down
13 changes: 9 additions & 4 deletions repo/fsrepo/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package migrations
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -290,7 +289,9 @@ func TestReadMigrationConfig(t *testing.T) {

type mockIpfsFetcher struct{}

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
var _ Fetcher = (*mockIpfsFetcher)(nil)

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
return nil, nil
}

Expand Down Expand Up @@ -323,7 +324,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}

Expand All @@ -341,7 +344,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}

Expand Down
33 changes: 33 additions & 0 deletions repo/fsrepo/migrations/retryfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package migrations

import (
"context"
"fmt"
)

type RetryFetcher struct {
Fetcher
MaxTries int
}

var _ Fetcher = (*RetryFetcher)(nil)

func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
var lastErr error
for i := 0; i < r.MaxTries; i++ {
out, err := r.Fetcher.Fetch(ctx, filePath)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
return out, nil
}

if ctx.Err() != nil {
return nil, ctx.Err()
}
lastErr = err
}
return nil, fmt.Errorf("exceeded number of retries. last error was %w", lastErr)
}

func (r *RetryFetcher) Close() error {
return r.Fetcher.Close()
}
6 changes: 3 additions & 3 deletions repo/fsrepo/migrations/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migrations

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -39,16 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable
// available on the distriburion site. List is in ascending order, unless
// sortDesc is true.
func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) {
rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
versionBytes, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
if err != nil {
return nil, err
}
defer rc.Close()

prefix := "v"
var vers []semver.Version

scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(bytes.NewReader(versionBytes))
for scan.Scan() {
ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix))
if err != nil {
Expand Down