diff --git a/data/builder/file.go b/data/builder/file.go index 8e2e6f1..8309e42 100644 --- a/data/builder/file.go +++ b/data/builder/file.go @@ -165,11 +165,11 @@ func fileTreeRecursive(depth int, children []ipld.Link, childLen []uint64, src c } pbn := dpbb.Build() - link, sz, err := sizedStore(ls, fileLinkProto, pbn) + link, _, err := sizedStore(ls, fileLinkProto, pbn) if err != nil { return nil, 0, err } - return link, totalSize + sz, nil + return link, totalSize, nil } // BuildUnixFSDirectoryEntry creates the link to a file or directory as it appears within a unixfs directory. diff --git a/file/large_file_test.go b/file/large_file_test.go new file mode 100644 index 0000000..6dc64d2 --- /dev/null +++ b/file/large_file_test.go @@ -0,0 +1,67 @@ +//go:build !race +// +build !race + +package file_test + +import ( + "bytes" + "context" + "io" + "strconv" + "testing" + + ipfsutil "github.com/ipfs/go-ipfs-util" + "github.com/ipfs/go-unixfsnode/data/builder" + "github.com/ipfs/go-unixfsnode/file" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" +) + +func TestLargeFileReader(t *testing.T) { + if testing.Short() || strconv.IntSize == 32 { + t.Skip() + } + buf := make([]byte, 512*1024*1024) + ipfsutil.NewSeededRand(0xdeadbeef).Read(buf) + r := bytes.NewReader(buf) + + ls := cidlink.DefaultLinkSystem() + storage := cidlink.Memory{} + ls.StorageReadOpener = storage.OpenRead + ls.StorageWriteOpener = storage.OpenWrite + + f, _, err := builder.BuildUnixFSFile(r, "", &ls) + if err != nil { + t.Fatal(err) + } + + // get back the root node substrate from the link at the top of the builder. + fr, err := ls.Load(ipld.LinkContext{}, f, dagpb.Type.PBNode) + if err != nil { + t.Fatal(err) + } + + ufn, err := file.NewUnixFSFile(context.Background(), fr, &ls) + if err != nil { + t.Fatal(err) + } + // read back out the file. + for i := 0; i < len(buf); i += 100 * 1024 * 1024 { + rs, err := ufn.AsLargeBytes() + if err != nil { + t.Fatal(err) + } + _, err = rs.Seek(int64(i), io.SeekStart) + if err != nil { + t.Fatal(err) + } + ob, err := io.ReadAll(rs) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(ob, buf[i:]) { + t.Fatal("Not equal at offset", i, "expected", len(buf[i:]), "got", len(ob)) + } + } +} diff --git a/file/shard.go b/file/shard.go index e3dc79b..0f08d14 100644 --- a/file/shard.go +++ b/file/shard.go @@ -3,7 +3,9 @@ package file import ( "context" "io" + "sync" + "github.com/ipfs/go-cid" "github.com/ipfs/go-unixfsnode/data" dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime" @@ -16,6 +18,10 @@ type shardNodeFile struct { ctx context.Context lsys *ipld.LinkSystem substrate ipld.Node + + // unixfs data unpacked from the substrate. access via .unpack() + metadata data.UnixFSData + unpackLk sync.Once } var _ ipld.Node = (*shardNodeFile)(nil) @@ -24,6 +30,7 @@ type shardNodeReader struct { *shardNodeFile rdr io.Reader offset int64 + len int64 } func (s *shardNodeReader) makeReader() (io.Reader, error) { @@ -32,18 +39,14 @@ func (s *shardNodeReader) makeReader() (io.Reader, error) { return nil, err } readers := make([]io.Reader, 0) - lnki := links.ListIterator() + lnkIter := links.ListIterator() at := int64(0) - for !lnki.Done() { - _, lnk, err := lnki.Next() - if err != nil { - return nil, err - } - sz, err := lnk.LookupByString("Tsize") + for !lnkIter.Done() { + lnkIdx, lnk, err := lnkIter.Next() if err != nil { return nil, err } - childSize, err := sz.AsInt() + childSize, tr, err := s.linkSize(lnk, int(lnkIdx)) if err != nil { return nil, err } @@ -51,18 +54,20 @@ func (s *shardNodeReader) makeReader() (io.Reader, error) { at += childSize continue } - lnkhash, err := lnk.LookupByString("Hash") - if err != nil { - return nil, err - } - lnklnk, err := lnkhash.AsLink() - if err != nil { - return nil, err - } - target := newDeferredFileNode(s.ctx, s.lsys, lnklnk) - tr, err := target.AsLargeBytes() - if err != nil { - return nil, err + if tr == nil { + lnkhash, err := lnk.LookupByString("Hash") + if err != nil { + return nil, err + } + lnklnk, err := lnkhash.AsLink() + if err != nil { + return nil, err + } + target := newDeferredFileNode(s.ctx, s.lsys, lnklnk) + tr, err = target.AsLargeBytes() + if err != nil { + return nil, err + } } // fastforward the first one if needed. if at < s.offset { @@ -77,9 +82,88 @@ func (s *shardNodeReader) makeReader() (io.Reader, error) { if len(readers) == 0 { return nil, io.EOF } + s.len = at return io.MultiReader(readers...), nil } +func (s *shardNodeFile) unpack() (data.UnixFSData, error) { + var retErr error + s.unpackLk.Do(func() { + nodeData, err := s.substrate.LookupByString("Data") + if err != nil { + retErr = err + return + } + nodeDataBytes, err := nodeData.AsBytes() + if err != nil { + retErr = err + return + } + ud, err := data.DecodeUnixFSData(nodeDataBytes) + if err != nil { + retErr = err + return + } + s.metadata = ud + }) + return s.metadata, retErr +} + +// returns the size of the n'th link from this shard. +// the io.ReadSeeker of the child will be return if it was loaded as part of the size calculation. +func (s *shardNodeFile) linkSize(lnk ipld.Node, position int) (int64, io.ReadSeeker, error) { + lnkhash, err := lnk.LookupByString("Hash") + if err != nil { + return 0, nil, err + } + lnklnk, err := lnkhash.AsLink() + if err != nil { + return 0, nil, err + } + _, c, err := cid.CidFromBytes([]byte(lnklnk.Binary())) + if err != nil { + return 0, nil, err + } + + // efficiency shortcut: for raw blocks, the size will match the bytes of content + if c.Prefix().Codec == cid.Raw { + size, err := lnk.LookupByString("Tsize") + if err != nil { + return 0, nil, err + } + sz, err := size.AsInt() + return sz, nil, err + } + + // check if there are blocksizes written, use them if there are. + // both err and md can be nil if this was not the first time unpack() + // was called but there was an error on the first call. + md, err := s.unpack() + if err == nil && md != nil { + pn, err := md.BlockSizes.LookupByIndex(int64(position)) + if err == nil { + innerNum, err := pn.AsInt() + if err == nil { + return innerNum, nil, nil + } + } + } + + // open the link and get its size. + target := newDeferredFileNode(s.ctx, s.lsys, lnklnk) + tr, err := target.AsLargeBytes() + if err != nil { + return 0, nil, err + } + + end, err := tr.Seek(0, io.SeekEnd) + if err != nil { + return end, nil, err + } + _, err = tr.Seek(0, io.SeekStart) + return end, tr, err +} + func (s *shardNodeReader) Read(p []byte) (int, error) { // build reader if s.rdr == nil { @@ -110,23 +194,16 @@ func (s *shardNodeReader) Seek(offset int64, whence int) (int64, error) { func (s *shardNodeFile) length() int64 { // see if we have size specified in the unixfs data. errors fall back to length from links - nodeData, err := s.substrate.LookupByString("Data") - if err != nil { - return s.lengthFromLinks() - } - nodeDataBytes, err := nodeData.AsBytes() - if err != nil { + nodeData, err := s.unpack() + if err != nil || nodeData == nil { return s.lengthFromLinks() } - ud, err := data.DecodeUnixFSData(nodeDataBytes) - if err != nil { - return s.lengthFromLinks() - } - if ud.FileSize.Exists() { - if fs, err := ud.FileSize.Must().AsInt(); err == nil { + if nodeData.FileSize.Exists() { + if fs, err := nodeData.FileSize.Must().AsInt(); err == nil { return int64(fs) } } + return s.lengthFromLinks() } @@ -138,15 +215,11 @@ func (s *shardNodeFile) lengthFromLinks() int64 { size := int64(0) li := links.ListIterator() for !li.Done() { - _, l, err := li.Next() - if err != nil { - return 0 - } - sn, err := l.LookupByString("Tsize") + idx, l, err := li.Next() if err != nil { return 0 } - ll, err := sn.AsInt() + ll, _, err := s.linkSize(l, int(idx)) if err != nil { return 0 } @@ -156,7 +229,7 @@ func (s *shardNodeFile) lengthFromLinks() int64 { } func (s *shardNodeFile) AsLargeBytes() (io.ReadSeeker, error) { - return &shardNodeReader{s, nil, 0}, nil + return &shardNodeReader{s, nil, 0, 0}, nil } func protoFor(link ipld.Link) ipld.NodePrototype {