Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm: push tags integration - request flow #1347

Merged
merged 18 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/swarm/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand All @@ -47,7 +48,7 @@ func hashes(ctx *cli.Context) {
}
defer f.Close()

fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
refs, err := fileStore.GetAllReferences(context.TODO(), f, false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func hash(ctx *cli.Context) {
defer f.Close()

stat, _ := f.Stat()
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
if err != nil {
return nil, err
}
Expand Down
30 changes: 4 additions & 26 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/contracts/ens"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -188,15 +189,17 @@ type API struct {
feed *feed.Handler
fileStore *storage.FileStore
dns Resolver
Tags *chunk.Tags
Decryptor func(context.Context, string) DecryptFunc
}

// NewAPI the api constructor initialises a new API instance.
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) {
func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) {
self = &API{
fileStore: fileStore,
dns: dns,
feed: feedHandler,
Tags: tags,
Decryptor: func(ctx context.Context, credentials string) DecryptFunc {
return self.doDecrypt(ctx, credentials, pk)
},
Expand Down Expand Up @@ -297,31 +300,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto
return addr, nil
}

// Put provides singleton manifest creation on top of FileStore store
func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method was only used by tests. i think we should not export test helpers on public APIs. hence the functionality was removed from the API struct and duplicated wherever necessary

apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}

// Get uses iterative manifest retrieval and prefix matching
// to resolve basePath to content using FileStore retrieve
// it returns a section reader, mimeType, status, the key of the actual content and an error
Expand Down
107 changes: 98 additions & 9 deletions swarm/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ package api
import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"math/big"
"os"
"strings"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/testutil"
)

func init() {
Expand All @@ -41,26 +45,34 @@ func init() {
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true)))))
}

func testAPI(t *testing.T, f func(*API, bool)) {
func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) {
datadir, err := ioutil.TempDir("", "bzz-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
tags := chunk.NewTags()
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
if err != nil {
return
}
api := NewAPI(fileStore, nil, nil, nil)
f(api, false)
f(api, true)
api := NewAPI(fileStore, nil, nil, nil, tags)
f(api, tags, false)
f(api, tags, true)
}

type testResponse struct {
reader storage.LazySectionReader
*Response
}

type Response struct {
MimeType string
Status int
Size int64
Content string
}

func checkResponse(t *testing.T, resp *testResponse, exp *Response) {

if resp.MimeType != exp.MimeType {
Expand Down Expand Up @@ -115,11 +127,11 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
}

func TestApiPut(t *testing.T) {
testAPI(t, func(api *API, toEncrypt bool) {
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
ctx := context.TODO()
addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -129,7 +141,26 @@ func TestApiPut(t *testing.T) {
}
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
testutil.CheckTag(t, tags, chunk.SPLIT, 2, 2) //1 chunk data, 1 chunk manifest
})
}

// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input
func xTestApiTagLarge(t *testing.T) {
zelig marked this conversation as resolved.
Show resolved Hide resolved
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
ctx := context.TODO()
_, wait, err := putRandomContent(ctx, api, 4096*4095, "text/plain", toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = wait(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
testutil.CheckTag(t, tags, chunk.SPLIT, 4129, 4129) //11 chunks random data, 1 chunk manifest
testutil.CheckTag(t, tags, chunk.SEEN, 0, 4129) //0 chunks seen, 12 total
})

}

// testResolver implements the Resolver interface and either returns the given
Expand Down Expand Up @@ -391,7 +422,7 @@ func TestDecryptOriginForbidden(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -425,7 +456,7 @@ func TestDecryptOrigin(t *testing.T) {
Access: &AccessEntry{Type: AccessTypePass},
}

api := NewAPI(nil, nil, nil, nil)
api := NewAPI(nil, nil, nil, nil, chunk.NewTags())

f := api.Decryptor(ctx, "")
err := f(me)
Expand Down Expand Up @@ -500,3 +531,61 @@ func TestDetectContentType(t *testing.T) {
})
}
}

// putRandomContent provides singleton manifest creation on top of API. it uploads an arbitrary byte stream
// of the desired contentLength and wraps it in a manifest
func putRandomContent(ctx context.Context, a *API, contentLength int, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))

tag, err := a.Tags.New("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, randomContentReader, int64(contentLength), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r := strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}

// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
tag, err := a.Tags.New("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
tag.DoneSplit(key)
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}
31 changes: 31 additions & 0 deletions swarm/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err
return "", err
}
req.ContentLength = size
req.Header.Set("x-swarm-tag", fmt.Sprintf("raw_upload_%d", time.Now().Unix()))

res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
Expand Down Expand Up @@ -111,6 +113,7 @@ func (c *Client) DownloadRaw(hash string) (io.ReadCloser, bool, error) {
type File struct {
io.ReadCloser
api.ManifestEntry
Tag string
}

// Open opens a local file which can then be passed to client.Upload to upload
Expand Down Expand Up @@ -139,6 +142,7 @@ func Open(path string) (*File, error) {
Size: stat.Size(),
ModTime: stat.ModTime(),
},
Tag: filepath.Base(path),
}, nil
}

Expand Down Expand Up @@ -422,6 +426,7 @@ func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, erro
// Uploader uploads files to swarm using a provided UploadFn
type Uploader interface {
Upload(UploadFn) error
Tag() string
}

type UploaderFunc func(UploadFn) error
Expand All @@ -430,12 +435,23 @@ func (u UploaderFunc) Upload(upload UploadFn) error {
return u(upload)
}

func (u UploaderFunc) Tag() string {
return fmt.Sprintf("multipart_upload_%d", time.Now().Unix())
}

// DirectoryUploader implements Uploader
var _ Uploader = &DirectoryUploader{}

// DirectoryUploader uploads all files in a directory, optionally uploading
// a file to the default path
type DirectoryUploader struct {
Dir string
}

func (d *DirectoryUploader) Tag() string {
return filepath.Base(d.Dir)
}

// Upload performs the upload of the directory and default path
func (d *DirectoryUploader) Upload(upload UploadFn) error {
return filepath.Walk(d.Dir, func(path string, f os.FileInfo, err error) error {
Expand All @@ -458,11 +474,17 @@ func (d *DirectoryUploader) Upload(upload UploadFn) error {
})
}

var _ Uploader = &FileUploader{}

// FileUploader uploads a single file
type FileUploader struct {
File *File
}

func (f *FileUploader) Tag() string {
return f.File.Tag
}

// Upload performs the upload of the file
func (f *FileUploader) Upload(upload UploadFn) error {
return upload(f.File)
Expand Down Expand Up @@ -509,6 +531,14 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
req.URL.RawQuery = q.Encode()
}

tag := uploader.Tag()
if tag == "" {
tag = "unnamed_tag_" + fmt.Sprintf("%d", time.Now().Unix())
}
log.Trace("setting upload tag", "tag", tag)

req.Header.Set("x-swarm-tag", tag)
acud marked this conversation as resolved.
Show resolved Hide resolved

// use 'Expect: 100-continue' so we don't send the request body if
// the server refuses the request
req.Header.Set("Expect", "100-continue")
Expand Down Expand Up @@ -574,6 +604,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error)

mw := multipart.NewWriter(reqW)
req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary()))
req.Header.Set("x-swarm-tag", fmt.Sprintf("multipart_upload_%d", time.Now().Unix()))
acud marked this conversation as resolved.
Show resolved Hide resolved

// define an UploadFn which adds files to the multipart form
uploadFn := func(file *File) error {
Expand Down
Loading