Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm: address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed May 5, 2019
1 parent 8c71b9f commit f628f74
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 41 deletions.
9 changes: 7 additions & 2 deletions swarm/api/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func InitLoggingResponseWriter(h http.Handler) http.Handler {
})
}

// InitUploadTag creates a new tag for an upload to the local HTTP proxy
// if a tag is not named using the SwarmTagHeaderName, a fallback name will be used
// when the Content-Length header is set, an ETA on chunking will be available since the
// number of chunks to be split is known in advance (not including enclosing manifest chunks)
// the tag can later be accessed using the appropriate identifier in the request context
func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
Expand All @@ -109,9 +114,9 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
if uri != nil {
log.Debug("got uri from context")
if uri.Addr == "encrypt" {
estimatedTotal = CalculateNumberOfChunks(r.ContentLength, true)
estimatedTotal = calculateNumberOfChunks(r.ContentLength, true)
} else {
estimatedTotal = CalculateNumberOfChunks(r.ContentLength, false)
estimatedTotal = calculateNumberOfChunks(r.ContentLength, false)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions swarm/api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,8 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize))
}

func CalculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length
func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
if contentLength < 4096 {
return 1
}
Expand All @@ -876,7 +877,7 @@ func CalculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {

dataChunks := math.Ceil(float64(contentLength) / float64(4096))
totalChunks := dataChunks
intermediate := float64(dataChunks) / float64(branchingFactor)
intermediate := dataChunks / float64(branchingFactor)

for intermediate > 1 {
totalChunks += math.Ceil(intermediate)
Expand Down
45 changes: 26 additions & 19 deletions swarm/api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,18 +848,17 @@ func testBzzTar(encrypted bool, t *testing.T) {
// by chunk size (4096). It is needed to be checked BEFORE chunking is done, therefore
// concurrency was introduced to slow down the HTTP request
func TestBzzCorrectTagEstimate(t *testing.T) {
srv := NewTestSwarmServer(t, serverFunc, nil)
defer srv.Close()

for _, v := range []struct {
toEncrypt bool
expChunks int64
}{
{toEncrypt: false, expChunks: 248},
{toEncrypt: true, expChunks: 250},
} {
srv := NewTestSwarmServer(t, serverFunc, nil)
defer srv.Close()

pr, pw := io.Pipe()
c := make(chan struct{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -879,27 +878,35 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
go func() {
for {
select {
case <-c:
case <-ctx.Done():
return
default:
case <-time.After(1 * time.Millisecond):
_, err := pw.Write([]byte{0})
if err != nil {
return
t.Error(err)
}
time.Sleep(100 * time.Millisecond)
}
}
}()

client := &http.Client{}
_, err = client.Do(req)
if err != nil {
t.Log(err)
go func() {
transport := http.DefaultTransport
_, err := transport.RoundTrip(req)
if err != nil {
t.Error(err)
}
}()
done := false
for !done {
switch len(srv.Tags.All()) {
case 0:
<-time.After(10 * time.Millisecond)
case 1:
tag := srv.Tags.All()[0]
testutil.CheckTag(t, tag, 0, 0, 0, v.expChunks)
srv.Tags.Delete(tag.Uid)
done = true
}
}
time.Sleep(100 * time.Millisecond)
tag := srv.Tags.All()[0]
testutil.CheckTag(t, tag, 0, 0, 0, v.expChunks)
close(c)
}
}

Expand Down Expand Up @@ -1344,7 +1351,7 @@ func TestCalculateNumberOfChunks(t *testing.T) {
{len: 1000000, chunks: 248},
{len: 325839339210, chunks: 79550620 + 621490 + 4856 + 38 + 1},
} {
res := CalculateNumberOfChunks(tc.len, false)
res := calculateNumberOfChunks(tc.len, false)
if res != tc.chunks {
t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
}
Expand All @@ -1364,7 +1371,7 @@ func TestCalculateNumberOfChunksEncrypted(t *testing.T) {
{len: 1000000, chunks: 245 + 4 + 1},
{len: 325839339210, chunks: 79550620 + 1242979 + 19422 + 304 + 5 + 1},
} {
res := CalculateNumberOfChunks(tc.len, true)
res := calculateNumberOfChunks(tc.len, true)
if res != tc.chunks {
t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
}
Expand Down
16 changes: 7 additions & 9 deletions swarm/chunk/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,12 @@ func (t *Tag) ETA(state State) (time.Time, error) {
func (tag *Tag) MarshalBinary() (data []byte, err error) {
buffer := make([]byte, 4)
binary.BigEndian.PutUint32(buffer, tag.Uid)
//encodeUint64Append(&buffer, tag.Uid)
encodeUint64Append(&buffer, tag.total)
encodeUint64Append(&buffer, tag.split)
encodeUint64Append(&buffer, tag.seen)
encodeUint64Append(&buffer, tag.stored)
encodeUint64Append(&buffer, tag.sent)
encodeUint64Append(&buffer, tag.synced)
encodeInt64Append(&buffer, tag.total)
encodeInt64Append(&buffer, tag.split)
encodeInt64Append(&buffer, tag.seen)
encodeInt64Append(&buffer, tag.stored)
encodeInt64Append(&buffer, tag.sent)
encodeInt64Append(&buffer, tag.synced)

intBuffer := make([]byte, 8)

Expand All @@ -185,7 +184,6 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error {
tag.Uid = binary.BigEndian.Uint32(buffer)
buffer = buffer[4:]

//tag.Uid = decodeInt64Splice(&buffer)
tag.total = decodeInt64Splice(&buffer)
tag.split = decodeInt64Splice(&buffer)
tag.seen = decodeInt64Splice(&buffer)
Expand All @@ -207,7 +205,7 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error {
return nil
}

func encodeUint64Append(buffer *[]byte, val int64) {
func encodeInt64Append(buffer *[]byte, val int64) {
intBuffer := make([]byte, 8)
n := binary.PutVarint(intBuffer, val)
*buffer = append(*buffer, intBuffer[:n]...)
Expand Down
9 changes: 7 additions & 2 deletions swarm/chunk/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (ts *Tags) New(s string, total int64) (*Tag, error) {
}

// All returns all existing tags in Tags' sync.Map
// Note that tags are returned in no particular order
func (ts *Tags) All() (t []*Tag) {
ts.tags.Range(func(k, v interface{}) bool {
t = append(t, v.(*Tag))
Expand All @@ -75,8 +76,8 @@ func (ts *Tags) Get(uid uint32) (*Tag, error) {
return t.(*Tag), nil
}

// GetContext gets a tag from the tag uid stored in the context
func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) {
// GetFromContext gets a tag from the tag uid stored in the context
func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) {
uid := sctx.GetTag(ctx)
t, ok := ts.tags.Load(uid)
if !ok {
Expand All @@ -89,3 +90,7 @@ func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) {
func (ts *Tags) Range(fn func(k, v interface{}) bool) {
ts.tags.Range(fn)
}

func (ts *Tags) Delete(k interface{}) {
ts.tags.Delete(k)
}
11 changes: 6 additions & 5 deletions swarm/chunk/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ func TestAll(t *testing.T) {
t.Fatalf("expected length to be 2 got %d", len(all))
}

if all[0].Total() != 1 {
t.Fatal("mismatch")
if n := all[0].Total(); n != 1 {
t.Fatalf("expected tag 0 total to be 1 got %d", n)
}
if all[1].Total() != 1 {
t.Fatal("mismatch")

if n := all[1].Total(); n != 1 {
t.Fatalf("expected tag 1 total to be 1 got %d", n)
}

ts.New("3", 1)
all = ts.All()

if len(all) != 3 {
t.Fatalf("expected length to be 2 got %d", len(all))
t.Fatalf("expected length to be 3 got %d", len(all))
}

}
4 changes: 2 additions & 2 deletions swarm/storage/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewFileStore(store ChunkStore, params *FileStoreParams, tags *chunk.Tags) *
// It returns a reader with the chunk data and whether the content was encrypted
func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) {
isEncrypted = len(addr) > f.hashFunc().Size()
tag, err := f.tags.GetContext(ctx)
tag, err := f.tags.GetFromContext(ctx)
if err != nil {
tag = chunk.NewTag(0, "ephemeral-retrieval-tag", 0)
}
Expand All @@ -97,7 +97,7 @@ func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChu
// Store is a public API. Main entry point for document storage directly. Used by the
// FS-aware API and httpaccess
func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) {
tag, err := f.tags.GetContext(ctx)
tag, err := f.tags.GetFromContext(ctx)
if err != nil {
// some of the parts of the codebase, namely the manifest trie, do not store the context
// of the original request nor the tag with the trie, recalculating the trie hence
Expand Down

0 comments on commit f628f74

Please sign in to comment.