Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test for reader / sizing behavior on large files #34

Merged
merged 4 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data/builder/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ func fileTreeRecursive(depth int, children []ipld.Link, childLen []uint64, src c
}
pbn := dpbb.Build()

link, sz, err := sizedStore(ls, fileLinkProto, pbn)
link, _, err := sizedStore(ls, fileLinkProto, pbn)
if err != nil {
return nil, 0, err
}
return link, totalSize + sz, nil
return link, totalSize, nil
}

// BuildUnixFSDirectoryEntry creates the link to a file or directory as it appears within a unixfs directory.
Expand Down
47 changes: 47 additions & 0 deletions file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"io"
"testing"

ipfsutil "github.com/ipfs/go-ipfs-util"
"github.com/ipfs/go-unixfsnode"
"github.com/ipfs/go-unixfsnode/data/builder"
"github.com/ipfs/go-unixfsnode/directory"
"github.com/ipfs/go-unixfsnode/file"
"github.com/ipld/go-car/v2/blockstore"
Expand Down Expand Up @@ -61,6 +63,51 @@ func TestNamedV0File(t *testing.T) {
}
}

func TestLargeFileReader(t *testing.T) {
buf := make([]byte, 1024*1024*1024)
ipfsutil.NewSeededRand(0xdeadbeef).Read(buf)
r := bytes.NewReader(buf)

ls := cidlink.DefaultLinkSystem()
storage := cidlink.Memory{}
ls.StorageReadOpener = storage.OpenRead
ls.StorageWriteOpener = storage.OpenWrite

f, _, err := builder.BuildUnixFSFile(r, "", &ls)
if err != nil {
t.Fatal(err)
}

// get back the root node substrate from the link at the top of the builder.
fr, err := ls.Load(ipld.LinkContext{}, f, dagpb.Type.PBNode)
if err != nil {
t.Fatal(err)
}

ufn, err := file.NewUnixFSFile(context.Background(), fr, &ls)
if err != nil {
t.Fatal(err)
}
// read back out the file.
for i := 0; i < len(buf); i += 100 * 1024 * 1024 {
rs, err := ufn.AsLargeBytes()
if err != nil {
t.Fatal(err)
}
_, err = rs.Seek(int64(i), io.SeekStart)
if err != nil {
t.Fatal(err)
}
ob, err := io.ReadAll(rs)
willscott marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(ob, buf[i:]) {
t.Fatal("Not equal at offset", i, "expected", len(buf[i:]), "got", len(ob))
}
}
}

func open(car string, t *testing.T) (ipld.Node, *ipld.LinkSystem) {
baseStore, err := blockstore.OpenReadOnly(car)
if err != nil {
Expand Down
143 changes: 108 additions & 35 deletions file/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package file

import (
"context"
"fmt"
"io"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode/data"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
Expand All @@ -16,6 +19,10 @@ type shardNodeFile struct {
ctx context.Context
lsys *ipld.LinkSystem
substrate ipld.Node

// unixfs data unpacked from the substrate. access via .unpack()
metadata data.UnixFSData
unpackLk sync.Once
}

var _ ipld.Node = (*shardNodeFile)(nil)
Expand All @@ -24,6 +31,7 @@ type shardNodeReader struct {
*shardNodeFile
rdr io.Reader
offset int64
len int64
}

func (s *shardNodeReader) makeReader() (io.Reader, error) {
Expand All @@ -34,35 +42,34 @@ func (s *shardNodeReader) makeReader() (io.Reader, error) {
readers := make([]io.Reader, 0)
lnki := links.ListIterator()
at := int64(0)
lin := 0
for !lnki.Done() {
_, lnk, err := lnki.Next()
if err != nil {
return nil, err
}
sz, err := lnk.LookupByString("Tsize")
if err != nil {
return nil, err
}
childSize, err := sz.AsInt()
childSize, tr, err := s.linkSize(lnk, lin)
if err != nil {
return nil, err
}
if s.offset > at+childSize {
at += childSize
continue
}
lnkhash, err := lnk.LookupByString("Hash")
if err != nil {
return nil, err
}
lnklnk, err := lnkhash.AsLink()
if err != nil {
return nil, err
}
target := newDeferredFileNode(s.ctx, s.lsys, lnklnk)
tr, err := target.AsLargeBytes()
if err != nil {
return nil, err
if tr == nil {
lnkhash, err := lnk.LookupByString("Hash")
if err != nil {
return nil, err
}
lnklnk, err := lnkhash.AsLink()
if err != nil {
return nil, err
}
target := newDeferredFileNode(s.ctx, s.lsys, lnklnk)
tr, err = target.AsLargeBytes()
if err != nil {
return nil, err
}
}
// fastforward the first one if needed.
if at < s.offset {
Expand All @@ -77,9 +84,86 @@ func (s *shardNodeReader) makeReader() (io.Reader, error) {
if len(readers) == 0 {
return nil, io.EOF
}
s.len = at
return io.MultiReader(readers...), nil
}

func (s *shardNodeFile) unpack() (data.UnixFSData, error) {
var retErr error
s.unpackLk.Do(func() {
nodeData, err := s.substrate.LookupByString("Data")
if err != nil {
retErr = err
return
}
nodeDataBytes, err := nodeData.AsBytes()
if err != nil {
retErr = err
return
}
ud, err := data.DecodeUnixFSData(nodeDataBytes)
if err != nil {
retErr = err
return
}
s.metadata = ud
})
return s.metadata, retErr
}

// returns the size of the n'th link from this shard.
// the io.ReadSeeker of the child will be return if it was loaded as part of the size calculation.
func (s *shardNodeFile) linkSize(lnk ipld.Node, position int) (int64, io.ReadSeeker, error) {
lnkhash, err := lnk.LookupByString("Hash")
if err != nil {
return 0, nil, err
}
lnklnk, err := lnkhash.AsLink()
if err != nil {
return 0, nil, err
}
_, c, err := cid.CidFromBytes([]byte(lnklnk.Binary()))
if err != nil {
return 0, nil, err
}

// efficiency shortcut: for raw blocks, the size will match the bytes of content
if c.Prefix().Codec == cid.Raw {
size, err := lnk.LookupByString("Tsize")
if err != nil {
return 0, nil, err
}
sz, err := size.AsInt()
return sz, nil, err
}

// check if there are blocksizes written, use them if there are.
willscott marked this conversation as resolved.
Show resolved Hide resolved
md, err := s.unpack()
if err == nil && md != nil {
pn, err := md.BlockSizes.LookupByIndex(int64(position))
if err == nil {
innerNum, err := pn.AsInt()
if err == nil {
return innerNum, nil, nil
}
}
}

// open the link and get its size.
target := newDeferredFileNode(s.ctx, s.lsys, lnklnk)
tr, err := target.AsLargeBytes()
if err != nil {
return 0, nil, err
}
fmt.Printf("had to get len by opening child.\n")
willscott marked this conversation as resolved.
Show resolved Hide resolved
end, err := tr.Seek(0, io.SeekEnd)
if err != nil {
return end, nil, err
}
_, err = tr.Seek(0, io.SeekStart)
return end, tr, err
}

func (s *shardNodeReader) Read(p []byte) (int, error) {
// build reader
if s.rdr == nil {
Expand Down Expand Up @@ -110,23 +194,16 @@ func (s *shardNodeReader) Seek(offset int64, whence int) (int64, error) {

func (s *shardNodeFile) length() int64 {
// see if we have size specified in the unixfs data. errors fall back to length from links
nodeData, err := s.substrate.LookupByString("Data")
if err != nil {
return s.lengthFromLinks()
}
nodeDataBytes, err := nodeData.AsBytes()
nodeData, err := s.unpack()
if err != nil {
willscott marked this conversation as resolved.
Show resolved Hide resolved
return s.lengthFromLinks()
}
ud, err := data.DecodeUnixFSData(nodeDataBytes)
if err != nil {
return s.lengthFromLinks()
}
if ud.FileSize.Exists() {
if fs, err := ud.FileSize.Must().AsInt(); err == nil {
if nodeData.FileSize.Exists() {
if fs, err := nodeData.FileSize.Must().AsInt(); err == nil {
return int64(fs)
}
}

return s.lengthFromLinks()
}

Expand All @@ -138,15 +215,11 @@ func (s *shardNodeFile) lengthFromLinks() int64 {
size := int64(0)
li := links.ListIterator()
for !li.Done() {
_, l, err := li.Next()
if err != nil {
return 0
}
sn, err := l.LookupByString("Tsize")
idx, l, err := li.Next()
if err != nil {
return 0
}
ll, err := sn.AsInt()
ll, _, err := s.linkSize(l, int(idx))
if err != nil {
return 0
}
Expand All @@ -156,7 +229,7 @@ func (s *shardNodeFile) lengthFromLinks() int64 {
}

func (s *shardNodeFile) AsLargeBytes() (io.ReadSeeker, error) {
return &shardNodeReader{s, nil, 0}, nil
return &shardNodeReader{s, nil, 0, 0}, nil
}

func protoFor(link ipld.Link) ipld.NodePrototype {
Expand Down