Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm: push tags integration - request flow (#1347)
Browse files Browse the repository at this point in the history
swarm/api: integrate tags to count chunks being split and stored
swarm/api/http: integrate tags in middleware for HTTP `POST` calls and assert chunks being calculated and counted correctly
swarm: remove deprecated and unused code, add swarm hash to DoneSplit signature, remove calls to the api client from the http package
  • Loading branch information
acud authored May 5, 2019
1 parent 9c5be52 commit c966633
Show file tree
Hide file tree
Showing 34 changed files with 699 additions and 362 deletions.
3 changes: 2 additions & 1 deletion cmd/swarm/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand All @@ -47,7 +48,7 @@ func hashes(ctx *cli.Context) {
}
defer f.Close()

fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
refs, err := fileStore.GetAllReferences(context.TODO(), f, false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func hash(ctx *cli.Context) {
defer f.Close()

stat, _ := f.Stat()
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
if err != nil {
return nil, err
}
Expand Down
32 changes: 4 additions & 28 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand All @@ -53,8 +54,6 @@ import (
var (
apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil)
apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil)
apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil)
apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil)
apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil)
apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil)
apiGetHTTP300 = metrics.NewRegisteredCounter("api.get.http.300", nil)
Expand Down Expand Up @@ -188,15 +187,17 @@ type API struct {
feed *feed.Handler
fileStore *storage.FileStore
dns Resolver
Tags *chunk.Tags
Decryptor func(context.Context, string) DecryptFunc
}

// NewAPI the api constructor initialises a new API instance.
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) {
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) {
self = &API{
fileStore: fileStore,
dns: dns,
feed: feedHandler,
Tags: tags,
Decryptor: func(ctx context.Context, credentials string) DecryptFunc {
return self.doDecrypt(ctx, credentials, pk)
},
Expand Down Expand Up @@ -297,31 +298,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto
return addr, nil
}

// Put provides singleton manifest creation on top of FileStore store
func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}

// Get uses iterative manifest retrieval and prefix matching
// to resolve basePath to content using FileStore retrieve
// it returns a section reader, mimeType, status, the key of the actual content and an error
Expand Down
108 changes: 91 additions & 17 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ package api
import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"math/big"
"os"
"strings"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/testutil"
)

func init() {
Expand All @@ -41,26 +45,35 @@ func init() {
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true)))))
}

func testAPI(t *testing.T, f func(*API, bool)) {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
if err != nil {
return
func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) {
for _, v := range []bool{true, false} {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
tags := chunk.NewTags()
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
if err != nil {
return
}
api := NewAPI(fileStore, nil, nil, nil, tags)
f(api, tags, v)
}
api := NewAPI(fileStore, nil, nil, nil)
f(api, false)
f(api, true)
}

type testResponse struct {
reader storage.LazySectionReader
*Response
}

type Response struct {
MimeType string
Status int
Size int64
Content string
}

func checkResponse(t *testing.T, resp *testResponse, exp *Response) {

if resp.MimeType != exp.MimeType {
Expand Down Expand Up @@ -111,15 +124,14 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
}
reader.Seek(0, 0)
return &testResponse{reader, &Response{mimeType, status, size, string(s)}}
// return &testResponse{reader, &Response{mimeType, status, reader.Size(), nil}}
}

func TestApiPut(t *testing.T) {
testAPI(t, func(api *API, toEncrypt bool) {
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
ctx := context.TODO()
addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -129,6 +141,40 @@ func TestApiPut(t *testing.T) {
}
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
})
}

// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input
func TestApiTagLarge(t *testing.T) {
const contentLength = 4096 * 4095
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
tag, err := api.Tags.New("unnamed-tag", 0)
if err != nil {
t.Fatal(err)
}
ctx := sctx.SetTag(context.Background(), tag.Uid)
key, waitContent, err := api.Store(ctx, randomContentReader, int64(contentLength), toEncrypt)
if err != nil {
t.Fatal(err)
}
err = waitContent(ctx)
if err != nil {
t.Fatal(err)
}
tag.DoneSplit(key)

if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
testutil.CheckTag(t, tag, expect, expect, 0, expect)
} else {
tag := tags.All()[0]
expect := int64(4095 + 32 + 1)
testutil.CheckTag(t, tag, expect, expect, 0, expect)
}
})
}

Expand Down Expand Up @@ -391,7 +437,7 @@ func TestDecryptOriginForbidden(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -425,7 +471,7 @@ func TestDecryptOrigin(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -500,3 +546,31 @@ func TestDetectContentType(t *testing.T) {
})
}
}

// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
tag, err := a.Tags.New("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}
32 changes: 32 additions & 0 deletions swarm/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -75,6 +76,8 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err
return "", err
}
req.ContentLength = size
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix()))

res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
Expand Down Expand Up @@ -111,6 +114,7 @@ func (c *Client) DownloadRaw(hash string) (io.ReadCloser, bool, error) {
type File struct {
io.ReadCloser
api.ManifestEntry
Tag string
}

// Open opens a local file which can then be passed to client.Upload to upload
Expand Down Expand Up @@ -139,6 +143,7 @@ func Open(path string) (*File, error) {
Size: stat.Size(),
ModTime: stat.ModTime(),
},
Tag: filepath.Base(path),
}, nil
}

Expand Down Expand Up @@ -422,6 +427,7 @@ func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, erro
// Uploader uploads files to swarm using a provided UploadFn
type Uploader interface {
Upload(UploadFn) error
Tag() string
}

type UploaderFunc func(UploadFn) error
Expand All @@ -430,12 +436,23 @@ func (u UploaderFunc) Upload(upload UploadFn) error {
return u(upload)
}

func (u UploaderFunc) Tag() string {
return fmt.Sprintf("multipart_upload_%d", time.Now().Unix())
}

// DirectoryUploader implements Uploader
var _ Uploader = &DirectoryUploader{}

// DirectoryUploader uploads all files in a directory, optionally uploading
// a file to the default path
type DirectoryUploader struct {
Dir string
}

func (d *DirectoryUploader) Tag() string {
return filepath.Base(d.Dir)
}

// Upload performs the upload of the directory and default path
func (d *DirectoryUploader) Upload(upload UploadFn) error {
return filepath.Walk(d.Dir, func(path string, f os.FileInfo, err error) error {
Expand All @@ -458,11 +475,17 @@ func (d *DirectoryUploader) Upload(upload UploadFn) error {
})
}

var _ Uploader = &FileUploader{}

// FileUploader uploads a single file
type FileUploader struct {
File *File
}

func (f *FileUploader) Tag() string {
return f.File.Tag
}

// Upload performs the upload of the file
func (f *FileUploader) Upload(upload UploadFn) error {
return upload(f.File)
Expand Down Expand Up @@ -509,6 +532,14 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
req.URL.RawQuery = q.Encode()
}

tag := uploader.Tag()
if tag == "" {
tag = "unnamed_tag_" + fmt.Sprintf("%d", time.Now().Unix())
}
log.Trace("setting upload tag", "tag", tag)

req.Header.Set(swarmhttp.SwarmTagHeaderName, tag)

// use 'Expect: 100-continue' so we don't send the request body if
// the server refuses the request
req.Header.Set("Expect", "100-continue")
Expand Down Expand Up @@ -574,6 +605,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error)

mw := multipart.NewWriter(reqW)
req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary()))
req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))

// define an UploadFn which adds files to the multipart form
uploadFn := func(file *File) error {
Expand Down
Loading

0 comments on commit c966633

Please sign in to comment.