Skip to content

Commit

Permalink
ipfs,downloader: fix block download, verify integrity before caching
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Mar 6, 2024
1 parent 3dbf170 commit 827f904
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
26 changes: 16 additions & 10 deletions ipfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,31 @@ func (n *Node) AddPeer(addr peer.AddrInfo) {
}

// Pin pins a CID
func (n *Node) Pin(ctx context.Context, c cid.Cid, recursive bool) error {
log := n.log.Named("Pin").With(zap.Stringer("rootCID", c), zap.Bool("recursive", recursive))
func (n *Node) Pin(ctx context.Context, root cid.Cid, recursive bool) error {
log := n.log.Named("Pin").With(zap.Stringer("rootCID", root), zap.Bool("recursive", recursive))
if !recursive {
block, err := n.dagService.Get(ctx, c)
block, err := n.dagService.Get(ctx, root)
if err != nil {
return fmt.Errorf("failed to get block: %w", err)
} else if err := n.blockService.AddBlock(ctx, block); err != nil {
return fmt.Errorf("failed to add block: %w", err)
}
return n.provider.Provide(c)
return n.provider.Provide(root)
}

sess := merkledag.NewSession(ctx, n.dagService)
seen := make(map[cid.Cid]bool)
err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(sess), c, func(c cid.Cid) bool {
if seen[c] {
seen := make(map[string]bool)
err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(sess), root, func(c cid.Cid) bool {
var key string
switch c.Version() {
case 0:
key = cid.NewCidV1(c.Type(), c.Hash()).String()
case 1:
key = c.String()
}
if seen[key] {
return false
}
seen[c] = true

log := log.With(zap.Stringer("childCID", c))
log.Debug("pinning child")
// TODO: queue and handle these correctly
Expand All @@ -126,13 +131,14 @@ func (n *Node) Pin(ctx context.Context, c cid.Cid, recursive bool) error {
log.Error("failed to add block", zap.Error(err))
return false
}
seen[key] = true
log.Debug("pinned block")
return true
}, merkledag.Concurrent(), merkledag.IgnoreErrors())
if err != nil {
return fmt.Errorf("failed to walk DAG: %w", err)
}
return n.provider.Provide(c)
return n.provider.Provide(root)
}

// PinCAR pins all blocks in a CAR file to the node. The input reader
Expand Down
27 changes: 19 additions & 8 deletions renterd/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"bytes"
"container/heap"
"context"
"errors"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
mh "github.com/multiformats/go-multihash"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
Expand Down Expand Up @@ -135,7 +137,6 @@ func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger)
blockBuf := bytes.NewBuffer(make([]byte, 0, 1<<20))

start := time.Now()

bucket, key, err := bd.store.BlockLocation(task.cid)
if err != nil {
if !format.IsNotFound(err) {
Expand All @@ -147,12 +148,7 @@ func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger)
return
}

err = bd.workerClient.DownloadObject(context.Background(), blockBuf, bucket, key, api.DownloadObjectOptions{
Range: api.DownloadRange{
Offset: 0,
Length: 1 << 20,
},
})
err = bd.workerClient.DownloadObject(context.Background(), blockBuf, bucket, key, api.DownloadObjectOptions{})
if err != nil {
if !format.IsNotFound(err) {
log.Error("failed to download block", zap.Error(err))
Expand All @@ -162,8 +158,23 @@ func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger)
close(task.ch)
return
}
log.Info("downloaded block", zap.Duration("elapsed", time.Since(start)))

c := task.cid
h, err := mh.Sum(blockBuf.Bytes(), c.Prefix().MhType, -1)
if err != nil {
log.Error("failed to hash block", zap.Error(err))
task.err = err
bd.cache.Remove(cidKey(task.cid))
close(task.ch)
return
} else if c.Hash().HexString() != h.HexString() {
log.Error("block hash mismatch", zap.String("expected", c.Hash().HexString()), zap.String("actual", h.HexString()))
task.err = errors.New("block hash mismatch")
bd.cache.Remove(cidKey(task.cid))
close(task.ch)
return
}
log.Info("downloaded block", zap.Duration("elapsed", time.Since(start)))
task.b = blockBuf.Bytes()
close(task.ch)
}
Expand Down

0 comments on commit 827f904

Please sign in to comment.